001package ca.uhn.fhir.jpa.subscription.module;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2020 University Health Network
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.util.StopWatch;
024import com.google.common.annotations.VisibleForTesting;
025import org.apache.commons.lang3.concurrent.BasicThreadFactory;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.springframework.messaging.Message;
029import org.springframework.messaging.MessageHandler;
030import org.springframework.messaging.SubscribableChannel;
031import org.springframework.messaging.support.ChannelInterceptor;
032import org.springframework.messaging.support.ExecutorSubscribableChannel;
033
034import java.util.ArrayList;
035import java.util.concurrent.*;
036
037public class LinkedBlockingQueueSubscribableChannel implements SubscribableChannel {
038        private Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueSubscribableChannel.class);
039
040        private final ExecutorSubscribableChannel mySubscribableChannel;
041        private final BlockingQueue<Runnable> myQueue;
042
043        public LinkedBlockingQueueSubscribableChannel(BlockingQueue<Runnable> theQueue, String theThreadNamingPattern, int theConcurrentConsumers) {
044
045                ThreadFactory threadFactory = new BasicThreadFactory.Builder()
046                        .namingPattern(theThreadNamingPattern)
047                        .daemon(false)
048                        .priority(Thread.NORM_PRIORITY)
049                        .build();
050                RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
051                        ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", theQueue.size());
052                        StopWatch sw = new StopWatch();
053                        try {
054                                theQueue.put(theRunnable);
055                        } catch (InterruptedException e) {
056                                Thread.currentThread().interrupt();
057                                throw new RejectedExecutionException("Task " + theRunnable.toString() +
058                                        " rejected from " + e.toString());
059                        }
060                        ourLog.info("Slot become available after {}ms", sw.getMillis());
061                };
062                ThreadPoolExecutor executor = new ThreadPoolExecutor(
063                        1,
064                        theConcurrentConsumers,
065                        0L,
066                        TimeUnit.MILLISECONDS,
067                        theQueue,
068                        threadFactory,
069                        rejectedExecutionHandler);
070                myQueue = theQueue;
071                mySubscribableChannel = new ExecutorSubscribableChannel(executor);
072        }
073
074        @Override
075        public boolean subscribe(MessageHandler handler) {
076                return mySubscribableChannel.subscribe(handler);
077        }
078
079        @Override
080        public boolean unsubscribe(MessageHandler handler) {
081                return mySubscribableChannel.unsubscribe(handler);
082        }
083
084        @Override
085        public boolean send(Message<?> message, long timeout) {
086                return mySubscribableChannel.send(message, timeout);
087        }
088
089        @VisibleForTesting
090        public void clearInterceptorsForUnitTest() {
091                mySubscribableChannel.setInterceptors(new ArrayList<>());
092        }
093
094        @VisibleForTesting
095        public void addInterceptorForUnitTest(ChannelInterceptor theInterceptor) {
096                mySubscribableChannel.addInterceptor(theInterceptor);
097        }
098
099        @VisibleForTesting
100        public int getQueueSizeForUnitTest() {
101                return myQueue.size();
102        }
103}