001package ca.uhn.fhir.jpa.subscription.module; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2020 University Health Network 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.util.StopWatch; 024import com.google.common.annotations.VisibleForTesting; 025import org.apache.commons.lang3.concurrent.BasicThreadFactory; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.springframework.messaging.Message; 029import org.springframework.messaging.MessageHandler; 030import org.springframework.messaging.SubscribableChannel; 031import org.springframework.messaging.support.ChannelInterceptor; 032import org.springframework.messaging.support.ExecutorSubscribableChannel; 033 034import java.util.ArrayList; 035import java.util.concurrent.*; 036 037public class LinkedBlockingQueueSubscribableChannel implements SubscribableChannel { 038 private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueSubscribableChannel.class); 039 040 private final ExecutorSubscribableChannel mySubscribableChannel; 041 private final BlockingQueue<Runnable> myQueue; 042 043 public LinkedBlockingQueueSubscribableChannel(BlockingQueue<Runnable> theQueue, String theThreadNamingPattern, int theConcurrentConsumers) { 044 045 ThreadFactory threadFactory = new BasicThreadFactory.Builder() 046 .namingPattern(theThreadNamingPattern) 047 .daemon(false) 048 .priority(Thread.NORM_PRIORITY) 049 .build(); 050 RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> { 051 ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", theQueue.size()); 052 StopWatch sw = new StopWatch(); 053 try { 054 theQueue.put(theRunnable); 055 } catch (InterruptedException e) { 056 Thread.currentThread().interrupt(); 057 throw new RejectedExecutionException("Task " + theRunnable.toString() + 058 " rejected from " + e.toString()); 059 } 060 ourLog.info("Slot become available after {}ms", sw.getMillis()); 061 }; 062 ThreadPoolExecutor executor = new ThreadPoolExecutor( 063 1, 064 theConcurrentConsumers, 065 0L, 066 TimeUnit.MILLISECONDS, 067 theQueue, 068 threadFactory, 069 rejectedExecutionHandler); 070 myQueue = theQueue; 071 mySubscribableChannel = new ExecutorSubscribableChannel(executor); 072 } 073 074 @Override 075 public boolean subscribe(MessageHandler handler) { 076 return mySubscribableChannel.subscribe(handler); 077 } 078 079 @Override 080 public boolean unsubscribe(MessageHandler handler) { 081 return mySubscribableChannel.unsubscribe(handler); 082 } 083 084 @Override 085 public boolean send(Message<?> message, long timeout) { 086 return mySubscribableChannel.send(message, timeout); 087 } 088 089 @VisibleForTesting 090 public void clearInterceptorsForUnitTest() { 091 mySubscribableChannel.setInterceptors(new ArrayList<>()); 092 } 093 094 @VisibleForTesting 095 public void addInterceptorForUnitTest(ChannelInterceptor theInterceptor) { 096 mySubscribableChannel.addInterceptor(theInterceptor); 097 } 098 099 @VisibleForTesting 100 public int getQueueSizeForUnitTest() { 101 return myQueue.size(); 102 } 103}