001package ca.uhn.fhir.jpa.subscription.module.subscriber; 002 003import ca.uhn.fhir.context.FhirContext; 004import ca.uhn.fhir.interceptor.api.HookParams; 005import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 006import ca.uhn.fhir.interceptor.api.Pointcut; 007import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; 008import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; 009import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; 010import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; 011import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; 012import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; 013import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; 014import ca.uhn.fhir.rest.api.Constants; 015import ca.uhn.fhir.rest.api.EncodingEnum; 016import org.apache.commons.lang3.StringUtils; 017import org.hl7.fhir.instance.model.api.IBaseResource; 018import org.hl7.fhir.instance.model.api.IIdType; 019import org.slf4j.Logger; 020import org.slf4j.LoggerFactory; 021import org.springframework.beans.factory.annotation.Autowired; 022import org.springframework.messaging.Message; 023import org.springframework.messaging.MessageChannel; 024import org.springframework.messaging.MessageHandler; 025import org.springframework.messaging.MessagingException; 026import org.springframework.stereotype.Service; 027 028import java.util.Collection; 029 030import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 031import static org.apache.commons.lang3.StringUtils.isNotBlank; 032 033/*- 034 * #%L 035 * HAPI FHIR Subscription Server 036 * %% 037 * Copyright (C) 2014 - 2020 University Health Network 038 * %% 039 * Licensed under the Apache License, Version 2.0 (the "License"); 040 * you may not use this file except in compliance with the License. 041 * You may obtain a copy of the License at 042 * 043 * http://www.apache.org/licenses/LICENSE-2.0 044 * 045 * Unless required by applicable law or agreed to in writing, software 046 * distributed under the License is distributed on an "AS IS" BASIS, 047 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 048 * See the License for the specific language governing permissions and 049 * limitations under the License. 050 * #L% 051 */ 052 053@Service 054public class SubscriptionMatchingSubscriber implements MessageHandler { 055 private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class); 056 057 @Autowired 058 private ISubscriptionMatcher mySubscriptionMatcher; 059 @Autowired 060 private FhirContext myFhirContext; 061 @Autowired 062 private SubscriptionRegistry mySubscriptionRegistry; 063 @Autowired 064 private IInterceptorBroadcaster myInterceptorBroadcaster; 065 @Autowired 066 private SubscriptionChannelRegistry mySubscriptionChannelRegistry; 067 068 @Override 069 public void handleMessage(Message<?> theMessage) throws MessagingException { 070 ourLog.trace("Handling resource modified message: {}", theMessage); 071 072 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 073 ourLog.warn("Unexpected message payload type: {}", theMessage); 074 return; 075 } 076 077 ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 078 matchActiveSubscriptionsAndDeliver(msg); 079 080 } 081 082 public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) { 083 switch (theMsg.getOperationType()) { 084 case CREATE: 085 case UPDATE: 086 case MANUALLY_TRIGGERED: 087 break; 088 case DELETE: 089 default: 090 ourLog.trace("Not processing modified message for {}", theMsg.getOperationType()); 091 // ignore anything else 092 return; 093 } 094 095 // Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED 096 HookParams params = new HookParams() 097 .add(ResourceModifiedMessage.class, theMsg); 098 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) { 099 return; 100 } 101 102 try { 103 doMatchActiveSubscriptionsAndDeliver(theMsg); 104 } finally { 105 // Interceptor call: SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED 106 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, params); 107 } 108 } 109 110 private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) { 111 IIdType resourceId = theMsg.getId(myFhirContext); 112 Boolean isText = false; 113 114 Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll(); 115 116 ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size()); 117 boolean resourceMatched = false; 118 119 for (ActiveSubscription nextActiveSubscription : subscriptions) { 120 121 String nextSubscriptionId = getId(nextActiveSubscription); 122 123 if (isNotBlank(theMsg.getSubscriptionId())) { 124 if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) { 125 // TODO KHS we should use a hash to look it up instead of this full table scan 126 ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId()); 127 continue; 128 } 129 } 130 131 if (!validCriteria(nextActiveSubscription, resourceId)) { 132 continue; 133 } 134 135 InMemoryMatchResult matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg); 136 if (!matchResult.matched()) { 137 continue; 138 } 139 ourLog.debug("Subscription {} was matched by resource {} {}", 140 nextActiveSubscription.getId(), 141 resourceId.toUnqualifiedVersionless().getValue(), 142 matchResult.isInMemory() ? "in-memory" : "by querying the repository"); 143 144 IBaseResource payload = theMsg.getNewPayload(myFhirContext); 145 CanonicalSubscription subscription = nextActiveSubscription.getSubscription(); 146 147 EncodingEnum encoding = null; 148 if (subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) { 149 encoding = EncodingEnum.forContentType(subscription.getPayloadString()); 150 isText = subscription.getPayloadString().equals(Constants.CT_TEXT); 151 } 152 encoding = defaultIfNull(encoding, EncodingEnum.JSON); 153 154 ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); 155 156 deliveryMsg.setPayload(myFhirContext, payload, encoding); 157 deliveryMsg.setSubscription(subscription); 158 deliveryMsg.setOperationType(theMsg.getOperationType()); 159 deliveryMsg.copyAdditionalPropertiesFrom(theMsg); 160 161 // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED 162 HookParams params = new HookParams() 163 .add(CanonicalSubscription.class, nextActiveSubscription.getSubscription()) 164 .add(ResourceDeliveryMessage.class, deliveryMsg) 165 .add(InMemoryMatchResult.class, matchResult); 166 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) { 167 return; 168 } 169 170 resourceMatched |= sendToDeliveryChannel(nextActiveSubscription, deliveryMsg); 171 } 172 173 if (!resourceMatched) { 174 // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED 175 HookParams params = new HookParams() 176 .add(ResourceModifiedMessage.class, theMsg); 177 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params); 178 } 179 } 180 181 private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { 182 boolean retval = false; 183 ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); 184 MessageChannel deliveryChannel = mySubscriptionChannelRegistry.get(nextActiveSubscription.getChannelName()).getChannel(); 185 if (deliveryChannel != null) { 186 retval = true; 187 trySendToDeliveryChannel(wrappedMsg, deliveryChannel); 188 } else { 189 ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId()); 190 } 191 return retval; 192 } 193 194 private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) { 195 try { 196 boolean success = theDeliveryChannel.send(theWrappedMsg); 197 if (!success) { 198 ourLog.warn("Failed to send message to Delivery Channel."); 199 } 200 } catch (RuntimeException e) { 201 ourLog.error("Failed to send message to Delivery Channel", e); 202 throw new RuntimeException("Failed to send message to Delivery Channel", e); 203 } 204 } 205 206 private String getId(ActiveSubscription theActiveSubscription) { 207 return theActiveSubscription.getId(); 208 } 209 210 private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) { 211 String criteriaString = theActiveSubscription.getCriteriaString(); 212 String subscriptionId = getId(theActiveSubscription); 213 String resourceType = theResourceId.getResourceType(); 214 215 if (StringUtils.isBlank(criteriaString)) { 216 return false; 217 } 218 219 // see if the criteria matches the created object 220 ourLog.trace("Checking subscription {} for {} with criteria {}", subscriptionId, resourceType, criteriaString); 221 String criteriaResource = criteriaString; 222 int index = criteriaResource.indexOf("?"); 223 if (index != -1) { 224 criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?")); 225 } 226 227 if (resourceType != null && !criteriaResource.equals(resourceType)) { 228 ourLog.trace("Skipping subscription search for {} because it does not match the criteria {}", resourceType, criteriaString); 229 return false; 230 } 231 232 return true; 233 } 234}