생산자 소비자 패턴은 생산자 스레드가 생산한 자료를 큐에 넣어 놓으면 소비자 스레드가 그걸 가져가서 처리하는 패턴을 말한다. 자료구조는 일반적인 큐와 lock을 함께 사용하거나 스레드에 안전한 ConcurrentQueue를 사용하면 되지 않을까 싶다. 단순하게 얘기하면 이렇게 간단하지만 조건에 따라서 굉장히 복잡해질 수 있다.

 

1. 생산할 데이터가 고정된 개수인가 무제한인가?

2. 큐의 사이즈는 고정인가 무제한인가?

3. 큐에 들어간 자료는 순서대로 처리되어야 하는가?

4. 생산자 스레드는 1개인가 여러 개인가?

5. 소비자 스레드는 1개인가 여러 개인가?

6. 스레드에서 데이터를 처리하는 시간은 얼마나 짧은가?

7. 생산보다 소비 속도가 빠른 경우, 큐에 데이터가 입력될 때까지 소비자 스레드가 대기하도록 해서 CPU를 낭비하지 않게 만들 수 있는가?

8. 생산자가 정상적으로 생산을 중단한 경우 소비자에게 알리는 방법?

 

.. 등등이 있다.

 

BlockingCollection을 이용하면 위와 같은 고민의 대부분을 해결할 수 있다. BlockingCollection 클래스는 내부적으로는 ConcurrentQueue를 사용하고 스레드가 동기화를 하기 편하게 만들어 놓은 클래스이다.

 

기본적인 함수는 다음과 같다.

// 생성자. 사이즈 제한을 걸 수 있는데, 최대 사이즈에 도달하면 생산자에 block이 걸린다.
public BlockingCollection();
public BlockingCollection(int boundedCapacity);

// 큐에 들어 있는 데이터 개수
public int Count { get; }

// 데이터 추가가 중단되었는가?
public bool IsAddingCompleted { get; }

// 데이터 추가가 중단되었고, 큐도 비었는가?
public bool IsCompleted { get; }

// 큐에 데이터 추가하기
// IsAddingCompleted 상태이면 예외가 발생한다
public void Add(T item);

// 데이터 추가 중단하도록 설정하기
// 이 함수 호출하면 이후 IsAddingCompleted가 true가 된다.
public void CompleteAdding();

// 큐에서 데이터 꺼내오기
// 큐에 데이터가 없으면, 계속 대기한다.
// IsCompleted 상태가 되면 예외가 발생한다.
public T Take();

// 큐에서 데이터 꺼내오기
// Take 함수와 기능이 동일하지만, cancellationToken가 설정되어 있으면 예외가 발생한다.
// 스레드 대기 중단을 위해선 이것을 사용해야 한다.
public T Take(CancellationToken cancellationToken);

생산자 스레드에서 Add 함수를 이용해서 큐에 데이터를 넣고, 소비자 스레드에서 Take함수를 이용해서 데이터를 꺼내와서 처리하면 된다.

 

큐의 사이즈에 제한이 걸려 있고 큐가 꽉 찬 경우엔 Add 함수에서 대기가 걸리고, 큐가 비어 있는 경우엔 Take에서 대기가 걸리기 때문에 CPU 낭비가 없다.

 

스레드를 중단하고 싶으면 CompleteAdding 함수만 한번 호출하면 된다. 이후에 Add와 Take에서 예외가 발생하기 때문에 예외만 잘 처리해 주면 된다.

 

소비자 스레드는 큐에 데이터가 남아 있으면 다 소비할 때까지 계속 도는데, CancellationToken을 이용해서 강제로 취소할 수도 있다.

 

 그외 TryAdd, TryGet 등 대기가 없는 함수들도 있는데, 앞서 6번에서 언급한 스레드에서 고속으로 데이터를 처리해야 하는 경우에 사용하면 된다. 넌블로킹 처리에 대해 자세한 내용을 알고 싶으면 다음의 링크를 참조하면 된다.

참고 링크 : docs.microsoft.com/ko-kr/dotnet/standard/collections/thread-safe/how-to-add-and-take-items (두번째 예제)

 

데이터의 처리 순서가 중요하지 않으면 AddAny, TakeAny 등의 함수를 사용하면 될 것으로 보이는데, 인자로 BlockingCollection 배열이 필요하다. 이것 또한 내용이 복잡해 보여서 이 글에서는 다루지 않겠다.

 

다음은 본인이 BlockingCollection을 사용한 생산자 소비자 패턴 예제이다. (c# 콘솔 프로젝트이고, 비주얼 스튜디오 2015로 작성하였다.)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Collections;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace CSBlockingCollection
{
	public class MyProduceConsumeQueue
	{
		// 추가/추출된 데이터 개수
		private int _addCount = 0;
		private int _takeCount = 0;

		// 블로킹 컬렉션 (내부적으론 ConcurrentQueue 사용)
		private BlockingCollection<int> _queue = new BlockingCollection<int>();

		// 최대 사이즈 제한을 걸수도 있다. 사이즈 제한이 있는 경우 큐가 가득 차면, 생산자가 대기해야 한다.
		//private BlockingCollection<int> _queue = new BlockingCollection<int>(10);

		// 추출 취소를 위한 토큰
		private CancellationTokenSource _source = new CancellationTokenSource();


		// 락을 위한 오브젝트
		public object LockObj = new object();

		// 큐 사이즈
		public int Count { get { return _queue.Count; } }

		// 추가된 데이터 개수
		public int AddedCount { get { return _addCount; } }

		// 추출한 데이터 개수
		public int TakenCount { get { return _takeCount; } }
		
		// 데이터 추가 중단
		public void CompleteAdding() { _queue.CompleteAdding(); }

		// 데이터 큐가 완전히 비었는지 검사
		public bool IsCompleted() { return _queue.IsCompleted; }

		// 데이터 추출 중단
		public void CancelTake() { _source.Cancel(); }


		// 데이터 추가
		public bool Add(int data)
		{
			try
			{
				_queue.Add(data);

				//++_addCount;
				Interlocked.Increment(ref _addCount);

				return true;
			}
			catch (Exception e)
			{
				// CompleteAdding를 호출하면 여기서 예외가 발생한다.
				Console.WriteLine(e.Message);
				return false;
			}
		}

		// 데이터 추출
		public bool Take(ref int data)
		{
			try
			{
				data = _queue.Take(_source.Token);

				//++_takeCount;
				Interlocked.Increment(ref _takeCount);

				return true;
			}
			catch (Exception e)
			{
				// CancelTake를 호출하면 여기서 예외가 발생한다.
				Console.WriteLine(e.Message);
				return false;
			}
		}

		// 큐 내용 출력하기 - 디버깅용
		public void PrintContents()
		{
			Console.Write($"[Queue] Add({AddedCount}), Take({TakenCount}), Count({_queue.Count}) => ");

			foreach (int item in _queue)
			{
				Console.Write("{0} ", item);
			}

			Console.WriteLine("");
		}
	}

	// 생산자 소비자 공통 부모용 클래스
	public abstract class ProducerConsumerBase
	{
		// 데이터 하나 처리에 필요한 최소, 최대 시간 시뮬레이션 값 (ms 단위)
		int _minProcessTime;
		int _maxProcessTime;

		// 랜덤
		protected Random _random = new Random();

		// 데이터 전달용 큐
		protected MyProduceConsumeQueue _queue;

		// 스레드 아이디
		public int ThreadId { get; private set; }

		// 데이터 처리된 개수
		public int ProcessedCount { get; set; }


		// 생성자
		public ProducerConsumerBase(MyProduceConsumeQueue q, int minProcessTime, int maxProcessTime)
		{
			_queue = q;

			_minProcessTime = minProcessTime;
			_maxProcessTime = maxProcessTime;
		}

		// 스레드 시작시 호출
		protected void OnThreadStart()
		{
			ThreadId = Thread.CurrentThread.ManagedThreadId;
		}

		// 스레드 잠시 대기 (데이터 처리 시뮬레이션용)
		protected void ThreadWait()
		{
			Thread.Sleep(_random.Next(_minProcessTime, _maxProcessTime));
		}

		// 스레드 함수
		public abstract void ThreadRun();
	}


	// 생산자
	public class Producer : ProducerConsumerBase
	{
		// 생성자
		public Producer(MyProduceConsumeQueue q, int minProcessTime, int maxProcessTime)
			: base(q, minProcessTime, maxProcessTime)
		{}

		// 스레드 함수
		public override void ThreadRun()
		{
			// 스레드 시작 처리
			OnThreadStart();

			//Stopwatch stopwatch = new Stopwatch();

			while(true)
			{
				// 랜덤 데이터 생성
				int data = _random.Next(0, 100);

				//stopwatch.Restart();

				// 데이터를 큐에 추가. 스레드에 안전하다.
				if (_queue.Add(data) == false)
					break;

				//stopwatch.Stop();

				// 대기시간이 있을 경우 표시
				//if (stopwatch.ElapsedMilliseconds != 0)
					//Console.WriteLine($"[{ThreadId:D2}] Produce Add Time : {stopwatch.ElapsedMilliseconds} ms");

				// 처리 카운트 증가
				++ProcessedCount;

				// 디버깅용 콘솔 출력 부분 (락을 걸어야 콘솔 출력이 깨지지 않는다.)
				// Add 와 별도로 락을 걸었기 때문에 출력되는 큐의 내용은 정확하지 않을 수 있다.
				lock (_queue.LockObj)
				{
					Console.Write($"[{ThreadId:D2}] Produce ({data:D2}) => ");
					_queue.PrintContents();
				}

				// 스레드 잠시 대기
				ThreadWait();
			}

			// 생산자 결과 출력
			Console.WriteLine($"[{ThreadId:D2}] Produced {ProcessedCount} items");
		}
	}

	// 소비자
	public class Consumer : ProducerConsumerBase
	{
		// 생성자
		public Consumer(MyProduceConsumeQueue q, int minProcessTime, int maxProcessTime)
			: base(q, minProcessTime, maxProcessTime)
		{ }

		// 스레드 함수
		public override void ThreadRun()
		{
			// 스레드 시작 처리
			OnThreadStart();

			while (true)
			{
				int data = 0;

				// 큐에서 데이터 하나 추출. 스레드에 안전하다.
				if (_queue.Take(ref data) == false)
					break;

				// 처리 카운트 증가
				++ProcessedCount;

				// 디버깅용 콘솔 출력 부분 (락을 걸어야 콘솔 출력이 깨지지 않는다.)
				// Take 와 별도로 락을 걸었기 때문에 출력되는 큐의 내용은 정확하지 않을 수 있다.
				lock (_queue.LockObj)
				{
					Console.Write($"[{ThreadId:D2}] Consume ({data:D2}) => ");
					_queue.PrintContents();
				}

				// 스레드 잠시 대기
				ThreadWait();
			}

			// 소비자 결과 출력
			Console.WriteLine($"[{ThreadId:D2}] Consumed {ProcessedCount} items", ProcessedCount);
		}
	}

	// 샘플 클래스
	public class ThreadSyncSample
	{
		static void Main()
		{
			Console.WriteLine("Configuring worker thread...");
			
			MyProduceConsumeQueue queue = new MyProduceConsumeQueue();

			// 생산자 정의
			Producer[] producerList = new[]
			{
				new Producer(queue, 100, 300),
				new Producer(queue, 200, 400),
			};

			// 소비자 정의 - 처리 속도를 느리게 하면 큐가 점점 쌓인다.
			Consumer[] consumerList = new[]
			{
				//new Consumer(queue, 100, 300),
				//new Consumer(queue, 100, 300),
				new Consumer(queue, 100, 300),				
				new Consumer(queue, 200, 600),
			};

			// 생산자 소비자 태스크 정의
			var producerTasks = new Task[producerList.Length];
			var consumerTasks = new Task[consumerList.Length];

			// 생산자 생성
			for (int i = 0; i < producerTasks.Length; ++i)
				producerTasks[i] = new Task(producerList[i].ThreadRun);

			// 소비자 생성
			for (int i = 0; i < consumerTasks.Length; ++i)
				consumerTasks[i] = new Task(consumerList[i].ThreadRun);


			Console.WriteLine("Launching producer and consumer threads...");
			
			// 생산자 태스크 실행
			Array.ForEach(producerTasks, t => t.Start());

			// 소비자 태스크 실행
			Array.ForEach(consumerTasks, t => t.Start());


			// ESC 키를 누르면 생산자를 중단한다는 안내
			Console.WriteLine("Press ESC to stop producers");

			// 키 입력이 있는가?
			while (true)
			{
				if (Console.KeyAvailable)
				{
					// 입력된 키가 ESC인가?
					var keyInfo = Console.ReadKey();
					if (keyInfo.Key == ConsoleKey.Escape)
					{
						// 생산자 스레드 중단 요청
						Console.WriteLine("Signaling producer threads to terminate...");
						queue.CompleteAdding();
						break;
					}
				}
			}

			// 생산자 스레드 종료 대기
			Task.WaitAll(producerTasks);


			// ESC 키를 누르면 소비자를 중단한다는 안내
			Console.WriteLine("Press ESC to stop consumers");

			// 큐가 빌 때까지 계속 대기
			while (queue.IsCompleted() == false)
			{
				if (Console.KeyAvailable)
				{
					// 입력된 키가 ESC인가?
					var keyInfo = Console.ReadKey();
					if (keyInfo.Key == ConsoleKey.Escape)
					{
						// 소비자 스레드 중단 요청
						Console.WriteLine("Signaling consumer threads to terminate...");
						queue.CancelTake();
						break; // while 빠져나가기
					}
				}
			}

			// 소비자 스레드 종료 대기
			Task.WaitAll(consumerTasks);

			Console.WriteLine("========================================");

			// 전체 생산량 계산
			int totalProduced = 0;
			foreach (var item in producerList)
			{
				Console.WriteLine($"[{item.ThreadId:D2}] Produced count : {item.ProcessedCount}");
				totalProduced += item.ProcessedCount;
			}

			// 전체 소비량 계산
			int totalConsumed = 0;
			foreach (var item in consumerList)
			{
				Console.WriteLine($"[{item.ThreadId:D2}] Consumed count : {item.ProcessedCount}");
				totalConsumed += item.ProcessedCount;
			}

			// 결과 출력
			Console.WriteLine($"Total Produced count : {totalProduced}");
			Console.WriteLine($"Total Consumed count : {totalConsumed}");
			Console.WriteLine($"Queue count : {queue.Count}");
			Console.WriteLine($"Queue add count : {queue.AddedCount}");
			Console.WriteLine($"Queue take count : {queue.TakenCount}");

			// 결과 검증 코드
			if (queue.AddedCount != totalProduced)
				Console.WriteLine($"ERROR : _queue.AddCount != totalProduced");

			if (queue.TakenCount != totalConsumed)
				Console.WriteLine($"ERROR : _queue.TakeCount != totalConsumed");

			if (queue.Count != (totalProduced - totalConsumed))
				Console.WriteLine($"ERROR : _queue.Count != (totalProduced - totalConsumed)");

			// 종료 대기
			Console.WriteLine("Press ENTER to exit.");
			Console.ReadLine();
		}
	}
}

예제에서는 생산자와 소비자 스레드가 각각 2개씩 돌아가도록 되어 있고, 각자의 처리 속도도 조절할 수 있게 했다. 스레드 개수도 조절이 가능하다.

 

일부러 소비자 속도를 좀 더 느리게 해서 시간이 지날수록 큐가 커지게 해 놓았는데, 큐에 남은 데이터를 끝까지 처리할 수 있는지 확인하기 위함이다. 소비자 스레드를 빠르게 설정할 수도 있는데, 그러면 소비자 스레드의 대기 시간이 길어진다.

 

예제 실행 화면은 다음과 같다.

 

ESC를 한번 누르면 생산자가 먼저 중단되고, 큐에 남은 데이터를 소비자가 마저 처리를 하는데, 이때 ESC를 한번 더 누르면 소비자도 강제 중단할 수 있다.

 

이 예제에서는 생산자를 먼저 중단하고 소비자를 나중에 종료하는 방식으로 처리했는데, 소비자를 먼저 중단하고 생산자를 중단하려면 Add 함수에도 CancellationToken이 필요할 수도 있다.

 

+ Recent posts