Filas em Java – Case de BlockingQueue

Esses dias estive com o um colega de trabalho, que recebeu uma missão de corrigir um problema em uma determinada aplicação. A funcionalidade problemática era multithread… muchas threads… very threads… centenas…

O cenário era de vários produtores para um único consumidor e o que estava sendo feito era o seguinte:

  • Criação de um pool de threads a cada chamada do método em questão
  • Start de cada thread do pool dentro de um laço
  • Encadeamento de threads não finalizadas

E essa situação gerava uma sobrecarga nos recursos do servidor, o que resultava no nosso já conhecido erro…

ERRO 500, pra quem não pegou a referência! hahaha’

O problema do multithread

A programação paralela com threads é extremamente útil em diversos cenários que demandam velocidade de execução, compartilhamento de código entre várias rotinas que estão em andamento e prioridades de processamento. E com soluções cada vez mais robustas hoje em dia e que trabalham com uma quantidade enorme de dados, isso é cada vez mais comum. Porém, o uso de Threads em um projeto pode ser tanto a solução, quanto o calcanhar de Aquiles do mesmo.

O paralelismo e concorrência, se mal utilizados e tratados podem causar erros de sincronização, lentidão (locks) e dependendo do foco da aplicação, duplicidades indesejadas. Além de retenção de recursos da máquina onde o sistema está hospedado.

Pois bem, então a gente se pergunta: o que fazer para desenvolver com mais segurança em cenários desse tipo?

Thread-safety

Um conceito já antigo e que muito provavelmente você já ouviu falar é o thread-safety, que nada mais é do que a garantia de execução segura de uma tarefa em um ambiente multithread. Algo é tido como Thread-Safe se, e somente se, tenha essa premissa de total segurança como garantia.

Filas e a interface BlockingQueue

A fila, a grosso modo, é uma estrutura semelhante as listas, é uma coleção de coisas que possui basicamente duas ações: inserir e remover. Mas uma grande diferença entre listas e filas é a possibilidade (nas listas) de inserir e remover objetos que não são das extremidades, mas sim do meio da lista. As filas não permitem a inserção e remoção de “inter-registros”, a adição é feita no fim da fila e a remoção sempre do início.

Outra coisa importante sobre filas, são os tipos: circulares, prioritárias, limitadas e ilimitadas, com bloqueio e sem bloqueio, estáticas e dinâmicas… é fila que não acaba mais!

Mas vamos ao foco, o nosso problema inicial e a Interface BlockinQueue

Relembrando o cenário:

Simulação de cenário multithread com vários produtores e um único consumidor

Na ilustração acima, temos um exemplo do que estava acontecendo e do que teria que ser resolvido. Imaginem as tasks como tarefas de gravação, por exemplo. E o consumer como um disco onde alguma informação será gravada por cada task. É de se esperar que dependendo do tempo de execução de cada tarefa, essa rotina leve o servidor a exaustão de recursos, pois as execuções vão se encavalar a todo momento, consumindo processamento, memória e os demais recursos da máquina até que se tenha um erro, como já falado, 500. Erro esse que pode ser causado por quantidade de memória insuficiente para a demanda, o também conhecido OutOfMemory.

Agora imaginem a situação a seguir:

Simulação de fila com um produtor e um consumidor

A situação acima mostra o esquemático de uma fila, que também pode gerar problemas, inclusive o anteriormente citado OutOfMemory. Caso a fila em questão seja do tipo ilimitada, ao adicionar vários e vários itens o servidor pode não dispor de memória suficiente para comportá-la e certamente lançará o erro de falta de memória. Mas aí é que vem o pulo do gato nas filas! As Blocking Queues.

Gatinhos hipinóticos para você não esquecer as filas

A interface BlockingQueue está presente desde a versão 1.5 do Java e tem várias implementações e todas elas são thread-safe, ou seja, seguras para ambientes multithread. E pegando o gancho das ilustrações anteriores, um único caminho para os dados utilizando uma fila de bloqueio, pode ser menos oneroso para os recursos computacionais e até mais rápido no tempo de gravação do que várias threads executando a tarefa ao mesmo tempo.

Aí você deve tá se perguntando: o que a fila de bloqueio faz tanto que ajuda em todo esse controle e rapidez? BLOQUEIA, ora! haha’

Brincadeiras a parte, como citei, existem várias implementações para a interface de Block: LinkedBlockingQueue, ConcurrentBlockingQueue, PriorityBlockingQueue… entre outras. As diferenças entre elas passam desde a limitação ou não do tamanho, até as prioridades de execução que podem ser dadas.

Mas beleza Dhellano e o problema? Como foi resolvido?

Pois bem, ao invés de várias threads, agora são criadas apenas duas: PRODUTORA e CONSUMIDORA. A thread produtora ou producer é responsável por alimentar uma coleção do tipo LinkedBlockingQueue com os parâmetros das requisições que precisam ser feitas para o método que gerava o problema (vide cenário no início do artigo) e a thread consumidora ou consumer é responsável por consumir a LinkedBlockingQueue e realizar as chamadas ao tal método. Mas isso agora é feito de modo auto-coordenado, pois nas BlockingQueues só entram parâmetros novos na fila quando os antigos vão sendo consumidos e retirados da mesma. Ou seja, o processamento de um registro tem que ser finalizado para que um novo ocupe o seu lugar na coleção e seja também executado (essa remoção e adição acontece da frente pra trás). A fila estando cheia, ninguém entra… o registro é BLOQUEADO… até que um novo espaço surja. E assim se garantiu a ordem, otimização de recursos e gestão da concorrência para uma melhor integridade dos dados.

Implementação

O código é composto essencialmente de 3 classes: A classe que cria os produtores, a de criação dos consumidores e a de start do processo de produção e consumo. Eu criei um projeto SpringBoot, só por conta do costume hehe’, mas você pode criar um projeto Java simples. Segue abaixo o código implementado e a explicação, caso você queira testar e verificar se ele se encaixa em alguma situação que você já passou ou esta passando:

A primeira classe a ser mostrada aqui é a produtora. É a partir dela que as mensagens são geradas e colocadas na fila de bloqueio.

package br.com.dhecastro.queueblockapp;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable {

	private final BlockingQueue<Integer> numbersQueue;
	private final Integer stopThread;
	private final int stopThreadPerProducer;

	Producer(BlockingQueue<Integer> numbersQueue, Integer stopThread, int stopThreadPerProducer) {
		this.numbersQueue = numbersQueue;
		this.stopThread = stopThread;
		this.stopThreadPerProducer = stopThreadPerProducer;
	}

	public void run() {
		try {
			generateNumbers();
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}

	private void generateNumbers() throws InterruptedException {
		for (int i = 0; i < 100; i++) {
			numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
		}
		for (int j = 0; j < stopThreadPerProducer; j++) {
			numbersQueue.put(stopThread);
		}
	}
}

A classe produtora possui uma estrutura bem simples. Ela é composta por um construtor que recebe como parâmetros uma BlockingQueue, um atributo chamado stopThread e outro chamado stopThreadPerProducer. Esses dois últimos parâmetros não foram citados até agora, mas calma, vou explicá-los:

As threads consumidoras precisam de um sinal que as faça parar de esperar algo para consumir e esse é um conceito conhecido como “pílulas de veneno”, o que no caso aqui chamei de stopThread. O sinal deve ser o último elemento da fila e assim que o consumidor o receber, irá verificar aquele valor e imediatamente encerrar o consumo. O stopThreadPerProducer, como o próprio nome diz, é a quantidade de sinais (stopThread) por thread produtora.

A fórmula para calcular o número de stops por produtor é:

NÚMERO DE PRODUTORES / NÚMERO DE CONSUMIDORES

Continuando no código, vemos que a classe Producer implementa Runnable, e por isso deve implementar o método run(). E o método run (no nosso exemplo) não faz nada mais do que gerar 100 números aleatórios para alimentar nosso lista de bloqueio. Fim da nossa classe produtora e vamos a consumidora!

A segunda classe do nosso exemplo é a Consumer. E segue o código abaixo:

package br.com.dhecastro.queueblockapp;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
	private final BlockingQueue<Integer> queue;
	private final Integer stopThread;

	Consumer(BlockingQueue<Integer> queue, Integer stopThread) {
		this.queue = queue;
		this.stopThread = stopThread;
	}

	public void run() {
		try {
			while (true) {

				Integer result = queue.take();
				if (result.equals(stopThread)) {
					return;
				}

				//uncomment to see the queue being consumed and returning to maximum at the predetermined size
				/** System.out.println("QUEUE SIZE ::::::::::::::::::::::::::::::: " + queue.size()); */
				
				System.out.println(Thread.currentThread().getName() + " result: " + result);

			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
}

A classe/thread consumidora é tão simples quando a produtora. Ela recebe os mesmos parâmetros no construtor, com exceção do stopThreadPerProducer, mas até a lista é a mesma, pois as threads compartilham o contexto (essa informação fica pela conta óbvio hehe’, já que uma thread vai consumir a fila gerada pela outra).

Aqui também notamos a implementação do método run, vindo de Runnable. E o que é feito nele?

A classe de consumo recebe a lista de bloqueio já preenchida pelas classes produtoras e começa então a processar os dados. Note a verificação do stopThread (anteriormente explicado) antes de qualquer processamento.

O nosso exemplo é bem simples, através de um println() estamos apenas mostrando o nome da thread e o dado que foi recuperado da fila. Mas aqui os testes podem variar para um melhor entendimento do funcionamento, repare no comentário na classe… o trecho de código, se descomentado, vai printar a fila diminuindo e crescendo ao passo em que ela é alimentada e consumida. As vezes ela está com tamanho 9, baixa para 8, 7, sobe para 9 novamente, baixa para 5, volta para 10 e assim sucessivamente até zerar. Veja:

BlockingQueue sendo alimentada e consumida durante o processo

Código da classe de consumo apresentado, chegamos a classe de disparo do processo com a criação dos produtores e consumidores, a SimulationBlockQueue. Lembrando que esses foram os nomes que eu dei, mas você pode dar o nome que quiser, hein? Usei esses para fins de conceito no exemplo. 😉

Segue o código da nossa última classe:

package br.com.dhecastro.queueblockapp;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class SimulationBlockQueue {
	public static void main(String[] args) {
	        
		//Config Queue Variables 
		int queue_max_size = 10;
        int producers_number = 1;
        int consumers_number = 1;
        int stop_thread = Integer.MAX_VALUE;
        int stop_thread_per_producer = consumers_number / producers_number;
        
        //Create queue
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(queue_max_size);

        //Create producers
        for (int i = 0; i < producers_number; i++) {
            new Thread(new Producer(queue, stop_thread, stop_thread_per_producer)).start();
        }

        //Create consumers
        for (int j = 0; j < consumers_number; j++) {
            new Thread(new Consumer(queue, stop_thread)).start();
        }
	}
}

A última e também a mais simples! A nossa classe possui um método main para que possamos executá-la, os parâmetros de configuração das threads, a criação da fila que será compartilhada no processo e por fim um for para criar cada produtor e consumidor. As variáveis de configuração no exemplo são:

  • producers_number: número de produtores que você deseja associar ao processo.
  • consumers_number: número do consumidores que irão recuperar e processar os dados da fila (gerados pelos produtores).
  • stop_thread: pílula de veneno escolhida para o exemplo. Caso lembrem, a thread produtora vai gerar números aleatórios de 0 a 100 (vide explicação da classe Producer), sendo assim, Integer.MAX_VALUE será o nosso stopThread (elemento estranho) e assim que detectado, o consumo será encerrado.
  • stop_thread_per_producer: como já explicaod, número de stops pro produtor.

Agora sim, tendo implementado as 3 classes, clique com o botão direito do mouse sobre a classe SimulationBlockQueue, que possui o método main. Em seguida no menu suspenso que será aberto, clique na opção Run As e logo depois em Java Application. A saída deve ser como a apresentada abaixo (com exceção da ordem dos números, que são gerados de forma aleatória):

Exemplo de execução bem sucedida da nossa aplicação

O exemplo foi baseado em um artigo do site Baeldung, que fica de dica para vocês leitores. O site é excelente e possui muito conteúdo de Java e vários outros assuntos relativos a desenvolvimento.

E é isso… ufa! Chegamos ao fim da explicação do nosso exemplo, com isso já é possível reproduzir o que foi mostrado e entender melhor a solução proposta do problema apresentado no início do artigo.


Caso queira tirar alguma dúvida, bater um papo sobre tecnologia e desenvolvimento ou propor uma correção/melhoria no conteúdo, aqui tem os meus contatos: http://dhecastro.com.br

Mas se quiser só colocar o assunto em prática, segue o link do projetinho no github: https://github.com/DheCastro/queueblockapp

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *