In mijn laatste artikel heb ik gesproken over de voordelen van het implementeren van asynchrone verwerking met Service Broker in SQL Server ten opzichte van de andere methoden die bestaan voor ontkoppelde verwerking van lange taken. In dit artikel gaan we door alle componenten die moeten worden geconfigureerd voor een basis Service Broker-configuratie in een enkele database, en de belangrijke overwegingen voor het gespreksbeheer tussen broker-services. Om te beginnen, moeten we een database maken en de database inschakelen voor gebruik door Service Broker:
MAAK DATABASE AsyncProcessingDemo;GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name =N'AsyncProcessingDemo') =0BEGIN WIJZIG DATABASE AsyncProcessingDemo SET ENABLE_BROKER;ENDGOpreUSE AsyncProcess;Brokercomponenten configureren
De basisobjecten die in de database moeten worden gemaakt, zijn de berichttypen voor de berichten, een contract dat bepaalt hoe de berichten tussen de services worden verzonden, een wachtrij en de initiatorservice, en een wachtrij en de doelservice. Veel voorbeelden online voor service broker tonen complexe objectnaamgeving voor de berichttypen, contracten en services voor Service Broker. Er is echter geen vereiste dat de namen complex zijn en eenvoudige objectnamen kunnen voor elk van de objecten worden gebruikt.
Voor de berichten moeten we een berichttype voor het verzoek maken, dat
AsyncRequest
zal heten. , en een berichttype voor het resultaat, datAsyncResult
. zal heten . Beiden gebruiken XML die wordt gevalideerd als correct gevormd door de brokerservices om de door de services vereiste gegevens te verzenden en te ontvangen.-- Maak de berichttypesCREATE MESSAGE TYPE [AsyncRequest] VALIDATION =WELL_FORMED_XML;CREATE MESSAGE TYPE [AsyncResult] VALIDATION =WELL_FORMED_XML;Het contract specificeert dat de
AsyncRequest
door de initiërende service naar de doelservice wordt verzonden en dat de doelservice eenAsyncResult
retourneert bericht terug naar de initiërende service. Het contract kan ook meerdere berichttypen specificeren voor de initiator en het doel, of dat een specifiek berichttype door elke service kan worden verzonden, als de specifieke verwerking dit vereist.-- Maak het contractCREATE CONTRACT [AsyncContract] ( [AsyncRequest] VERZONDEN DOOR INITIATOR, [AsyncResult] VERZONDEN DOOR TARGET);Voor elk van de services moet een wachtrij worden gemaakt om de berichten die door de service worden ontvangen, op te slaan. De doelservice waarnaar het verzoek wordt verzonden, moet worden gemaakt met vermelding van het
AsyncContract
om berichten naar de dienst te kunnen verzenden. In dit geval heet de serviceProcessingService
en wordt aangemaakt in deProcessingQueue
binnen de databank. Voor de initiërende service hoeft geen contract te worden opgegeven, waardoor deze alleen berichten kan ontvangen als reactie op een gesprek dat ermee is gestart.-- Maak de verwerkingswachtrij en service - specificeer het contract om verzending naar de service toe te staan. CREATE QUEUE ProcessingQueue;CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Maak de aanvraagwachtrij en service CREATE QUEUE RequestQueue;CREATE SERVICE [RequestService] ON QUEUE RequestQueue;Een bericht verzenden voor verwerking
Zoals ik in het vorige artikel heb uitgelegd, geef ik er de voorkeur aan om een in de wrapper opgeslagen procedure te implementeren voor het verzenden van een nieuw bericht naar een brokerservice, zodat het eenmalig kan worden gewijzigd om de prestaties indien nodig te schalen. Deze procedure is een eenvoudige wrapper rond het maken van een nieuw gesprek en het verzenden van het bericht naar de
ProcessingService
.-- Maak de wrapper-procedure voor het verzenden van berichtenCREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XMLASBEGIN SET NOCOUNT ON; VERKLAREN @conversation_handle UNIEKE IDENTIFICATIE; BEGIN TRANSACTIE; BEGIN DIALOOGGESPREK @conversation_handle VAN SERVICE @FromService NAAR SERVICE @ToService OP CONTRACT @Contract MET ENCRYPTIE =UIT; VERZENDEN IN GESPREK @conversation_handle TYPE BOODSCHAP @MessageType(@MessageBody); TRANSACTIE COMMIT;ENDGOMet behulp van de in de wrapper opgeslagen procedure kunnen we nu een testbericht sturen naar de
ProcessingService
om te valideren dat we de makelaarsdiensten correct hebben ingesteld.-- Stuur een requestEXECUTE dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N'ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N''; -- Controleer op bericht in de verwerkingswachtrij SELECT CAST (message_body AS XML) FROM ProcessingQueue;GO 12345 Berichten verwerken
Terwijl we de berichten van de
ProcessingQueue
. handmatig konden verwerken , willen we waarschijnlijk dat de berichten automatisch worden verwerkt wanneer ze worden verzonden naar deProcessingService
. Om dit te doen, moet een opgeslagen activeringsprocedure worden gemaakt die we zullen testen en later aan de wachtrij zullen binden om de verwerking bij activering van de wachtrij te automatiseren. Om een bericht te verwerken moeten weRECEIVE
het bericht uit de wachtrij binnen een transactie, samen met het berichttype en de gesprekshandle voor het bericht. Het berichttype zorgt ervoor dat de juiste logica wordt toegepast op het bericht dat wordt verwerkt, en de gesprekshandle zorgt ervoor dat een antwoord kan worden teruggestuurd naar de initiërende service wanneer het bericht is verwerkt.De
RECEIVE
Met de opdracht kan een enkel bericht of meerdere berichten binnen dezelfde gesprekshandle of -groep in één transactie worden verwerkt. Om meerdere berichten te verwerken, moet een tabelvariabele worden gebruikt, of om een enkel bericht te verwerken, kan een lokale variabele worden gebruikt. De onderstaande activeringsprocedure haalt een enkel bericht op uit de wachtrij, controleert het berichttype om te bepalen of het eenAsyncRequest
is bericht en voert vervolgens het langlopende proces uit op basis van de ontvangen berichtinformatie. Als het geen bericht binnen de lus ontvangt, wacht het tot 5000 ms, of 5 seconden, totdat een ander bericht de wachtrij binnenkomt voordat het de lus verlaat en de uitvoering ervan beëindigt. Na het verwerken van een bericht, bouwt het eenAsyncResult
bericht en stuurt het terug naar de initiator op dezelfde gesprekshandle als waarvan het bericht is ontvangen. De procedure controleert ook het berichttype om te bepalen of eenEndDialog
ofError
bericht is ontvangen om het gesprek op te schonen door het te beëindigen.-- Maak verwerkingsprocedure voor verwerking wachtrijCREATE PROCEDURE dbo.ProcessingQueueActivationASBEGIN STEL NOCOUNT IN; VERKLAREN @conversation_handle UNIEKE IDENTIFICATIE; VERKLAREN @message_body XML; VERKLAREN @message_type_name sysname; TERWIJL (1=1) BEGIN BEGIN TRANSACTIE; WACHTEN (BOVENAAN ONTVANGST (1) @conversation_handle =conversation_handle, @message_body =CAST(message_body AS XML), @message_type_name =message_type_name FROM ProcessingQueue), TIMEOUT 5000; ALS (@@ROWCOUNT =0) BEGIN MET ROLLBACK-TRANSACTIE; PAUZE; END IF @message_type_name =N'AsyncRequest' BEGIN -- Handel hier complexe lange verwerking af -- Voor demonstratie trekken we het accountnummer en sturen we alleen een antwoord DECLARE @AccountNumber INT =@message_body.value('(AsyncRequest/AccountNumber) [1]', 'INT'); -- Stel een antwoordbericht samen en stuur het terug DECLARE @reply_message_body XML =N' ' + CAST(@AccountNumber AS NVARCHAR (11)) + ' '; VERZENDEN IN GESPREK @conversation_handle BERICHT TYPE [AsyncResult] (@reply_message_body); END -- Als het dialoogbericht wordt beëindigd, beëindigt u het dialoogvenster ANDERS ALS @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN EINDE CONVERSATIE @conversation_handle; EINDE -- Indien foutmelding, log en beëindig het gesprek ANDERS IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log de foutcode in en voer de vereiste afhandeling hier uit -- End het gesprek voor de fout END CONVERSATION @conversation_handle; EINDE COMMIT TRANSACTIE; ENDENDGODe
RequestQueue
zal ook de berichten moeten verwerken die ernaar worden verzonden, dus een extra procedure voor het verwerken van hetAsyncResult
berichten die door de ProcessingQueueActivation-procedure worden geretourneerd, moeten worden gemaakt. Aangezien we weten dat het AsnycResult-bericht betekent dat al het verwerkingswerk is voltooid, kan het gesprek worden beëindigd zodra we dat bericht hebben verwerkt, waardoor een EndDialog-bericht naar de ProcessingService wordt verzonden, dat vervolgens wordt verwerkt door de activeringsprocedure om de gesprek om alles op te ruimen en het vuur te vermijden en vergeet de problemen die optreden als gesprekken correct worden beëindigd.-- Maak een procedure voor het verwerken van antwoorden op de aanvraagwachtrijCREATE PROCEDURE dbo.RequestQueueActivationASBEGIN STEL NOCOUNT IN; VERKLAREN @conversation_handle UNIEKE IDENTIFICATIE; VERKLAREN @message_body XML; VERKLAREN @message_type_name sysname; TERWIJL (1=1) BEGIN BEGIN TRANSACTIE; WACHTEN ( ONTVANG TOP (1) @conversation_handle =conversation_handle, @message_body =CAST(message_body AS XML), @message_type_name =message_type_name FROM RequestQueue), TIMEOUT 5000; ALS (@@ROWCOUNT =0) BEGIN MET ROLLBACK-TRANSACTIE; PAUZE; END IF @message_type_name =N'AsyncResult' BEGIN -- Indien nodig behandel het antwoordbericht hier DECLARE @AccountNumber INT =@message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Aangezien dit al het werk is dat gedaan wordt, beëindigt u het gesprek om het EndDialog-bericht te verzenden END CONVERSATION @conversation_handle; END -- Als het dialoogbericht wordt beëindigd, beëindigt u het dialoogvenster ANDERS ALS @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN EINDE CONVERSATIE @conversation_handle; EINDE -- Als foutmelding, log en beëindig het gesprek ANDERS ALS @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN EINDE GESPREK @conversation_handle; EINDE COMMIT TRANSACTIE; ENDENDGODe procedures testen
Voordat we de wachtrijverwerking voor onze services automatiseren, is het belangrijk om de activeringsprocedures te testen om ervoor te zorgen dat ze de berichten op de juiste manier verwerken en om te voorkomen dat een wachtrij wordt uitgeschakeld als er een fout optreedt die niet correct wordt afgehandeld. Aangezien er al een bericht is in de
ProcessingQueue
deProcessingQueueActivation
procedure kan worden uitgevoerd om dat bericht te verwerken. Houd er rekening mee dat deWAITFOR
zal ervoor zorgen dat de procedure 5 seconden duurt om te beëindigen, ook al wordt het bericht onmiddellijk vanuit de wachtrij verwerkt. Nadat we het bericht hebben verwerkt, kunnen we controleren of de procedure correct werkte door deRequestQueue
. op te vragen om te zien of eenAsyncResult
bericht bestaat, en dan kunnen we verifiëren dat deRequestQueueActivation
procedure correct werkt door het uit te voeren.-- Verwerk het bericht uit de verwerkingswachtrijEXECUTE dbo.ProcessingQueueActivation;GO -- Controleer op antwoordbericht op verzoek wachtrijSELECT CAST(message_body AS XML) FROM RequestQueue;GO -- Verwerk het bericht uit de verzoekwachtrijEXECUTE dbo.RequestQueueActivation;GODe verwerking automatiseren
Op dit moment zijn alle componenten voltooid om onze verwerking volledig te automatiseren. Het enige dat overblijft is om de activeringsprocedures aan hun juiste wachtrijen te binden en vervolgens een ander testbericht te sturen om te valideren dat het wordt verwerkt en dat er daarna niets meer in de wachtrijen staat.
-- Wijzig de verwerkingswachtrij om interne activering op te gevenALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS =ON, PROCEDURE_NAME =dbo.ProcessingQueueActivation, MAX_QUEUE_READERS =10, EXECUTE AS SELF );GO -- Wijzig de aanvraagwachtrij om interne activering op te gevenALTER QUEUE RequestQueu MET ACTIVERING (STATUS =AAN, PROCEDURE_NAME =dbo.RequestQueueActivation, MAX_QUEUE_READERS =10, UITVOEREN ALS ZELF);GO -- Test automatische activering-- Stuur een verzoek UITVOEREN dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N' ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N''; -- Controleer op bericht in de verwerkingswachtrij -- er is niets omdat het automatisch is verwerkt SELECT CAST (message_body AS XML) FROM ProcessingQueue; GO -- Controleer op antwoordbericht in de wachtrij voor verzoeken -- er is niets omdat het automatisch werd verwerkt SELECT CAST (bericht_body AS XML) VANUIT RequestQueue;GO 12345 Samenvatting
De basiscomponenten voor geautomatiseerde asynchrone verwerking in SQL Server Service Broker kunnen worden geconfigureerd in een enkele databaseconfiguratie om ontkoppelde verwerking van langlopende taken mogelijk te maken. Dit kan een krachtig hulpmiddel zijn om de applicatieprestaties te verbeteren, vanuit de ervaring van een eindgebruiker, door de verwerking los te koppelen van de interacties van de eindgebruiker met de applicatie.