001package ca.uhn.fhir.jpa.subscription.module.subscriber; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2020 University Health Network 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.interceptor.api.HookParams; 024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 025import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.rest.api.EncodingEnum; 028import ca.uhn.fhir.rest.api.RequestTypeEnum; 029import ca.uhn.fhir.rest.client.api.*; 030import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor; 031import ca.uhn.fhir.rest.gclient.IClientExecutable; 032import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 033import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 034import org.apache.commons.lang3.StringUtils; 035import org.hl7.fhir.instance.model.api.IBaseResource; 036import org.hl7.fhir.instance.model.api.IIdType; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.springframework.beans.factory.annotation.Autowired; 040import org.springframework.context.annotation.Scope; 041import org.springframework.messaging.MessagingException; 042import org.springframework.stereotype.Component; 043 044import java.io.IOException; 045import java.util.*; 046 047import static org.apache.commons.lang3.StringUtils.isNotBlank; 048 049@Component 050@Scope("prototype") 051public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDeliverySubscriber { 052 @Autowired 053 IResourceRetriever myResourceRetriever; 054 private Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class); 055 056 protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) { 057 IBaseResource payloadResource = getAndMassagePayload(theMsg, theSubscription); 058 059 // Regardless of whether we have a payload, the rest-hook should be sent. 060 doDelivery(theMsg, theSubscription, thePayloadType, theClient, payloadResource); 061 } 062 063 protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient, IBaseResource thePayloadResource) { 064 IClientExecutable<?, ?> operation; 065 switch (theMsg.getOperationType()) { 066 case CREATE: 067 case UPDATE: 068 if (thePayloadResource == null || thePayloadResource.isEmpty()) { 069 if (thePayloadType != null) { 070 operation = theClient.create().resource(thePayloadResource); 071 } else { 072 sendNotification(theMsg); 073 return; 074 } 075 } else { 076 if (thePayloadType != null) { 077 operation = theClient.update().resource(thePayloadResource); 078 } else { 079 sendNotification(theMsg); 080 return; 081 } 082 } 083 break; 084 case DELETE: 085 operation = theClient.delete().resourceById(theMsg.getPayloadId(myFhirContext)); 086 break; 087 default: 088 ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType()); 089 return; 090 } 091 092 if (thePayloadType != null) { 093 operation.encoded(thePayloadType); 094 } 095 096 String payloadId = null; 097 if (thePayloadResource != null) { 098 payloadId = thePayloadResource.getIdElement().toUnqualified().getValue(); 099 } 100 ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue()); 101 102 try { 103 operation.execute(); 104 } catch (ResourceNotFoundException e) { 105 ourLog.error("Cannot reach {} ", theMsg.getSubscription().getEndpointUrl()); 106 ourLog.error("Exception: ", e); 107 throw e; 108 } 109 } 110 111 protected IBaseResource getAndMassagePayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription) { 112 IBaseResource payloadResource = theMsg.getPayload(myFhirContext); 113 114 if (payloadResource == null || theSubscription.getRestHookDetails().isDeliverLatestVersion()) { 115 IIdType payloadId = theMsg.getPayloadId(myFhirContext); 116 117 try { 118 if (payloadId != null) { 119 payloadResource = myResourceRetriever.getResource(payloadId.toVersionless()); 120 } else { 121 return null; 122 } 123 } catch (ResourceGoneException e) { 124 ourLog.warn("Resource {} is deleted, not going to deliver for subscription {}", payloadId.toVersionless(), theSubscription.getIdElement(myFhirContext)); 125 return null; 126 } 127 } 128 129 IIdType resourceId = payloadResource.getIdElement(); 130 if (theSubscription.getRestHookDetails().isStripVersionId()) { 131 resourceId = resourceId.toVersionless(); 132 payloadResource.setId(resourceId); 133 } 134 return payloadResource; 135 } 136 137 @Override 138 public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException { 139 CanonicalSubscription subscription = theMessage.getSubscription(); 140 141 // Interceptor call: SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY 142 HookParams params = new HookParams() 143 .add(CanonicalSubscription.class, subscription) 144 .add(ResourceDeliveryMessage.class, theMessage); 145 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY, params)) { 146 return; 147 } 148 149 // Grab the endpoint from the subscription 150 String endpointUrl = subscription.getEndpointUrl(); 151 152 // Grab the payload type (encoding mimetype) from the subscription 153 String payloadString = subscription.getPayloadString(); 154 EncodingEnum payloadType = null; 155 if (payloadString != null) { 156 payloadType = EncodingEnum.forContentType(payloadString); 157 } 158 159 // Create the client request 160 myFhirContext.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER); 161 IGenericClient client = null; 162 if (isNotBlank(endpointUrl)) { 163 client = myFhirContext.newRestfulGenericClient(endpointUrl); 164 165 // Additional headers specified in the subscription 166 List<String> headers = subscription.getHeaders(); 167 for (String next : headers) { 168 if (isNotBlank(next)) { 169 client.registerInterceptor(new SimpleRequestHeaderInterceptor(next)); 170 } 171 } 172 } 173 174 deliverPayload(theMessage, subscription, payloadType, client); 175 176 // Interceptor call: SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY 177 params = new HookParams() 178 .add(CanonicalSubscription.class, subscription) 179 .add(ResourceDeliveryMessage.class, theMessage); 180 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY, params)) { 181 //noinspection UnnecessaryReturnStatement 182 return; 183 } 184 185 } 186 187 /** 188 * Sends a POST notification without a payload 189 */ 190 protected void sendNotification(ResourceDeliveryMessage theMsg) { 191 Map<String, List<String>> params = new HashMap<>(); 192 List<Header> headers = new ArrayList<>(); 193 if (theMsg.getSubscription().getHeaders() != null) { 194 theMsg.getSubscription().getHeaders().stream().filter(Objects::nonNull).forEach(h -> { 195 final int sep = h.indexOf(':'); 196 if (sep > 0) { 197 final String name = h.substring(0, sep); 198 final String value = h.substring(sep + 1); 199 if (StringUtils.isNotBlank(name)) { 200 headers.add(new Header(name.trim(), value.trim())); 201 } 202 } 203 }); 204 } 205 206 StringBuilder url = new StringBuilder(theMsg.getSubscription().getEndpointUrl()); 207 IHttpClient client = myFhirContext.getRestfulClientFactory().getHttpClient(url, params, "", RequestTypeEnum.POST, headers); 208 IHttpRequest request = client.createParamRequest(myFhirContext, params, null); 209 try { 210 IHttpResponse response = request.execute(); 211 // close connection in order to return a possible cached connection to the connection pool 212 response.close(); 213 } catch (IOException e) { 214 ourLog.error("Error trying to reach " + theMsg.getSubscription().getEndpointUrl()); 215 e.printStackTrace(); 216 throw new ResourceNotFoundException(e.getMessage()); 217 } 218 } 219}