sql >> Database >  >> RDS >> Database

Service Broker configureren voor asynchrone verwerking

In mijn laatste artikel heb ik gesproken over de voordelen van het implementeren van asynchrone verwerking met Service Broker in SQL Server ten opzichte van de andere methoden die bestaan ​​voor ontkoppelde verwerking van lange taken. In dit artikel gaan we door alle componenten die moeten worden geconfigureerd voor een basis Service Broker-configuratie in een enkele database, en de belangrijke overwegingen voor het gespreksbeheer tussen broker-services. Om te beginnen, moeten we een database maken en de database inschakelen voor gebruik door Service Broker:

MAAK DATABASE AsyncProcessingDemo;GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name =N'AsyncProcessingDemo') =0BEGIN WIJZIG DATABASE AsyncProcessingDemo SET ENABLE_BROKER;ENDGOpreUSE AsyncProcess; 

Brokercomponenten configureren

De basisobjecten die in de database moeten worden gemaakt, zijn de berichttypen voor de berichten, een contract dat bepaalt hoe de berichten tussen de services worden verzonden, een wachtrij en de initiatorservice, en een wachtrij en de doelservice. Veel voorbeelden online voor service broker tonen complexe objectnaamgeving voor de berichttypen, contracten en services voor Service Broker. Er is echter geen vereiste dat de namen complex zijn en eenvoudige objectnamen kunnen voor elk van de objecten worden gebruikt.

Voor de berichten moeten we een berichttype voor het verzoek maken, dat AsyncRequest zal heten. , en een berichttype voor het resultaat, dat AsyncResult . zal heten . Beiden gebruiken XML die wordt gevalideerd als correct gevormd door de brokerservices om de door de services vereiste gegevens te verzenden en te ontvangen.

-- Maak de berichttypesCREATE MESSAGE TYPE [AsyncRequest] VALIDATION =WELL_FORMED_XML;CREATE MESSAGE TYPE [AsyncResult] VALIDATION =WELL_FORMED_XML;

Het contract specificeert dat de AsyncRequest door de initiërende service naar de doelservice wordt verzonden en dat de doelservice een AsyncResult retourneert bericht terug naar de initiërende service. Het contract kan ook meerdere berichttypen specificeren voor de initiator en het doel, of dat een specifiek berichttype door elke service kan worden verzonden, als de specifieke verwerking dit vereist.

-- Maak het contractCREATE CONTRACT [AsyncContract] ( [AsyncRequest] VERZONDEN DOOR INITIATOR, [AsyncResult] VERZONDEN DOOR TARGET);

Voor elk van de services moet een wachtrij worden gemaakt om de berichten die door de service worden ontvangen, op te slaan. De doelservice waarnaar het verzoek wordt verzonden, moet worden gemaakt met vermelding van het AsyncContract om berichten naar de dienst te kunnen verzenden. In dit geval heet de service ProcessingService en wordt aangemaakt in de ProcessingQueue binnen de databank. Voor de initiërende service hoeft geen contract te worden opgegeven, waardoor deze alleen berichten kan ontvangen als reactie op een gesprek dat ermee is gestart.

-- Maak de verwerkingswachtrij en service - specificeer het contract om verzending naar de service toe te staan. CREATE QUEUE ProcessingQueue;CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Maak de aanvraagwachtrij en service CREATE QUEUE RequestQueue;CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Een bericht verzenden voor verwerking

Zoals ik in het vorige artikel heb uitgelegd, geef ik er de voorkeur aan om een ​​in de wrapper opgeslagen procedure te implementeren voor het verzenden van een nieuw bericht naar een brokerservice, zodat het eenmalig kan worden gewijzigd om de prestaties indien nodig te schalen. Deze procedure is een eenvoudige wrapper rond het maken van een nieuw gesprek en het verzenden van het bericht naar de ProcessingService .

-- Maak de wrapper-procedure voor het verzenden van berichtenCREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XMLASBEGIN SET NOCOUNT ON; VERKLAREN @conversation_handle UNIEKE IDENTIFICATIE; BEGIN TRANSACTIE; BEGIN DIALOOGGESPREK @conversation_handle VAN SERVICE @FromService NAAR SERVICE @ToService OP CONTRACT @Contract MET ENCRYPTIE =UIT; VERZENDEN IN GESPREK @conversation_handle TYPE BOODSCHAP @MessageType(@MessageBody); TRANSACTIE COMMIT;ENDGO

Met behulp van de in de wrapper opgeslagen procedure kunnen we nu een testbericht sturen naar de ProcessingService om te valideren dat we de makelaarsdiensten correct hebben ingesteld.

-- Stuur een requestEXECUTE dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N'ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Controleer op bericht in de verwerkingswachtrij SELECT CAST (message_body AS XML) FROM ProcessingQueue;GO

Berichten verwerken

Terwijl we de berichten van de ProcessingQueue . handmatig konden verwerken , willen we waarschijnlijk dat de berichten automatisch worden verwerkt wanneer ze worden verzonden naar de ProcessingService . Om dit te doen, moet een opgeslagen activeringsprocedure worden gemaakt die we zullen testen en later aan de wachtrij zullen binden om de verwerking bij activering van de wachtrij te automatiseren. Om een ​​bericht te verwerken moeten we RECEIVE het bericht uit de wachtrij binnen een transactie, samen met het berichttype en de gesprekshandle voor het bericht. Het berichttype zorgt ervoor dat de juiste logica wordt toegepast op het bericht dat wordt verwerkt, en de gesprekshandle zorgt ervoor dat een antwoord kan worden teruggestuurd naar de initiërende service wanneer het bericht is verwerkt.

De RECEIVE Met de opdracht kan een enkel bericht of meerdere berichten binnen dezelfde gesprekshandle of -groep in één transactie worden verwerkt. Om meerdere berichten te verwerken, moet een tabelvariabele worden gebruikt, of om een ​​enkel bericht te verwerken, kan een lokale variabele worden gebruikt. De onderstaande activeringsprocedure haalt een enkel bericht op uit de wachtrij, controleert het berichttype om te bepalen of het een AsyncRequest is bericht en voert vervolgens het langlopende proces uit op basis van de ontvangen berichtinformatie. Als het geen bericht binnen de lus ontvangt, wacht het tot 5000 ms, of 5 seconden, totdat een ander bericht de wachtrij binnenkomt voordat het de lus verlaat en de uitvoering ervan beëindigt. Na het verwerken van een bericht, bouwt het een AsyncResult bericht en stuurt het terug naar de initiator op dezelfde gesprekshandle als waarvan het bericht is ontvangen. De procedure controleert ook het berichttype om te bepalen of een EndDialog of Error bericht is ontvangen om het gesprek op te schonen door het te beëindigen.

-- Maak verwerkingsprocedure voor verwerking wachtrijCREATE PROCEDURE dbo.ProcessingQueueActivationASBEGIN STEL NOCOUNT IN; VERKLAREN @conversation_handle UNIEKE IDENTIFICATIE; VERKLAREN @message_body XML; VERKLAREN @message_type_name sysname; TERWIJL (1=1) BEGIN BEGIN TRANSACTIE; WACHTEN (BOVENAAN ONTVANGST (1) @conversation_handle =conversation_handle, @message_body =CAST(message_body AS XML), @message_type_name =message_type_name FROM ProcessingQueue), TIMEOUT 5000; ALS (@@ROWCOUNT =0) BEGIN MET ROLLBACK-TRANSACTIE; PAUZE; END IF @message_type_name =N'AsyncRequest' BEGIN -- Handel hier complexe lange verwerking af -- Voor demonstratie trekken we het accountnummer en sturen we alleen een antwoord DECLARE @AccountNumber INT =@message_body.value('(AsyncRequest/AccountNumber) [1]', 'INT'); -- Stel een antwoordbericht samen en stuur het terug DECLARE @reply_message_body XML =N' ' + CAST(@AccountNumber AS NVARCHAR (11)) + ' '; VERZENDEN IN GESPREK @conversation_handle BERICHT TYPE [AsyncResult] (@reply_message_body); END -- Als het dialoogbericht wordt beëindigd, beëindigt u het dialoogvenster ANDERS ALS @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN EINDE CONVERSATIE @conversation_handle; EINDE -- Indien foutmelding, log en beëindig het gesprek ANDERS IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log de foutcode in en voer de vereiste afhandeling hier uit -- End het gesprek voor de fout END CONVERSATION @conversation_handle; EINDE COMMIT TRANSACTIE; ENDENDGO

De RequestQueue zal ook de berichten moeten verwerken die ernaar worden verzonden, dus een extra procedure voor het verwerken van het AsyncResult berichten die door de ProcessingQueueActivation-procedure worden geretourneerd, moeten worden gemaakt. Aangezien we weten dat het AsnycResult-bericht betekent dat al het verwerkingswerk is voltooid, kan het gesprek worden beëindigd zodra we dat bericht hebben verwerkt, waardoor een EndDialog-bericht naar de ProcessingService wordt verzonden, dat vervolgens wordt verwerkt door de activeringsprocedure om de gesprek om alles op te ruimen en het vuur te vermijden en vergeet de problemen die optreden als gesprekken correct worden beëindigd.

-- Maak een procedure voor het verwerken van antwoorden op de aanvraagwachtrijCREATE PROCEDURE dbo.RequestQueueActivationASBEGIN STEL NOCOUNT IN; VERKLAREN @conversation_handle UNIEKE IDENTIFICATIE; VERKLAREN @message_body XML; VERKLAREN @message_type_name sysname; TERWIJL (1=1) BEGIN BEGIN TRANSACTIE; WACHTEN ( ONTVANG TOP (1) @conversation_handle =conversation_handle, @message_body =CAST(message_body AS XML), @message_type_name =message_type_name FROM RequestQueue), TIMEOUT 5000; ALS (@@ROWCOUNT =0) BEGIN MET ROLLBACK-TRANSACTIE; PAUZE; END IF @message_type_name =N'AsyncResult' BEGIN -- Indien nodig behandel het antwoordbericht hier DECLARE @AccountNumber INT =@message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Aangezien dit al het werk is dat gedaan wordt, beëindigt u het gesprek om het EndDialog-bericht te verzenden END CONVERSATION @conversation_handle; END -- Als het dialoogbericht wordt beëindigd, beëindigt u het dialoogvenster ANDERS ALS @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN EINDE CONVERSATIE @conversation_handle; EINDE -- Als foutmelding, log en beëindig het gesprek ANDERS ALS @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN EINDE GESPREK @conversation_handle; EINDE COMMIT TRANSACTIE; ENDENDGO

De procedures testen

Voordat we de wachtrijverwerking voor onze services automatiseren, is het belangrijk om de activeringsprocedures te testen om ervoor te zorgen dat ze de berichten op de juiste manier verwerken en om te voorkomen dat een wachtrij wordt uitgeschakeld als er een fout optreedt die niet correct wordt afgehandeld. Aangezien er al een bericht is in de ProcessingQueue de ProcessingQueueActivation procedure kan worden uitgevoerd om dat bericht te verwerken. Houd er rekening mee dat de WAITFOR zal ervoor zorgen dat de procedure 5 seconden duurt om te beëindigen, ook al wordt het bericht onmiddellijk vanuit de wachtrij verwerkt. Nadat we het bericht hebben verwerkt, kunnen we controleren of de procedure correct werkte door de RequestQueue . op te vragen om te zien of een AsyncResult bericht bestaat, en dan kunnen we verifiëren dat de RequestQueueActivation procedure correct werkt door het uit te voeren.

-- Verwerk het bericht uit de verwerkingswachtrijEXECUTE dbo.ProcessingQueueActivation;GO -- Controleer op antwoordbericht op verzoek wachtrijSELECT CAST(message_body AS XML) FROM RequestQueue;GO -- Verwerk het bericht uit de verzoekwachtrijEXECUTE dbo.RequestQueueActivation;GO 

De verwerking automatiseren

Op dit moment zijn alle componenten voltooid om onze verwerking volledig te automatiseren. Het enige dat overblijft is om de activeringsprocedures aan hun juiste wachtrijen te binden en vervolgens een ander testbericht te sturen om te valideren dat het wordt verwerkt en dat er daarna niets meer in de wachtrijen staat.

-- Wijzig de verwerkingswachtrij om interne activering op te gevenALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS =ON, PROCEDURE_NAME =dbo.ProcessingQueueActivation, MAX_QUEUE_READERS =10, EXECUTE AS SELF );GO -- Wijzig de aanvraagwachtrij om interne activering op te gevenALTER QUEUE RequestQueu MET ACTIVERING (STATUS =AAN, PROCEDURE_NAME =dbo.RequestQueueActivation, MAX_QUEUE_READERS =10, UITVOEREN ALS ZELF);GO -- Test automatische activering-- Stuur een verzoek UITVOEREN dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N' ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Controleer op bericht in de verwerkingswachtrij -- er is niets omdat het automatisch is verwerkt SELECT CAST (message_body AS XML) FROM ProcessingQueue; GO -- Controleer op antwoordbericht in de wachtrij voor verzoeken -- er is niets omdat het automatisch werd verwerkt SELECT CAST (bericht_body AS XML) VANUIT RequestQueue;GO

Samenvatting

De basiscomponenten voor geautomatiseerde asynchrone verwerking in SQL Server Service Broker kunnen worden geconfigureerd in een enkele databaseconfiguratie om ontkoppelde verwerking van langlopende taken mogelijk te maken. Dit kan een krachtig hulpmiddel zijn om de applicatieprestaties te verbeteren, vanuit de ervaring van een eindgebruiker, door de verwerking los te koppelen van de interacties van de eindgebruiker met de applicatie.


  1. Hoe Prisma te gebruiken

  2. psql:FATAL:Peer-authenticatie mislukt voor gebruikersdev

  3. efficiënte manier om paging te implementeren

  4. Voeg een procentteken toe aan een getal in MariaDB