Through the term was not really in the vernacular as it is today, I have been a full or part-time “Data Engineer” my entire career. I have been quite comfortable with Microsoft ETL tools like SSIS for many years, dating back to the DTS days. My comfort with SSIS came from many years of trial and error via experimentation as well as adhering to the best practices put forth and tested by many of my colleagues in the SQL Server field. It was and still is a widely used and well-documented ETL platform. With the release of Azure Data Factory several years ago, though it was not touted as an SSIS replacement, many data engineers started working with and documenting this code-free or low-code orchestration experience, and I was one of them.
As with any technology, only with knowledge and experience will you be able to take advantage of all its key benefits, and by the same token will you uncover its severe limitations. On a recent assignment to build a complex logical data workflow in Azure Data Factory, that ironically had less “data” and more “flow” to engineer, I discovered not only benefits and limitations in the tool itself but also in the documentation that provided arcane and incomplete guidance at best. Some of the incomplete knowledge I needed was intrinsic to Azure Logic Apps, which I grant I had done very little with until this project, but it played a pivotal role as an activity called from the pipeline. I wanted to share a few pieces of this project with you here in hopes to bolster, however small, the available sources for quick insight into advanced challenges with ADF and to a lesser extent, Azure Logic Apps.
Specifically, I was asked to create a pipeline-driven workflow that sends approval emails with a file attachment and waits for the recipients to either approve, reject or ignore the email. If the approvers do not respond to the emails in the time frame defined by several variables like time of day and type of file, then a reminder email must be sent. Again, the recipients can approve, reject or ignore the reminder. Finally, a third email is sent to yet another approver with the same options. Ultimately the process will either copy the approved file to a secure FTP site after both of the initial two recipients or the final recipient approves the file, or it will send an email to the business saying the file was rejected. It may sound simple enough, even in a flow diagram; however, there were several head-scratchers and frustrated, lengthy ceiling stares that I may have easily avoided with a bit of foreknowledge.
The following are the four challenges I had to overcome to call the project a success:
When sending an approval email from Azure Logic Apps, which is initiated via a Webhook activity from the ADF pipeline, how do I force a response by a specific time of day? For example, if the initial emails must be approved or rejected by 9:00 AM, and it is triggered at 8:26 AM which is itself a variable time to start, how do I force the email to return control to the pipeline in 34 minutes?
The second challenge came with the Webhook activity itself. The Logic App needed to return status values back to the calling pipeline. While there was some minimal documentation that explained that a callbackURI
was needed in an HTTP Post from within the Logic App, what I found informationally lacking was how to actually pass back values.
The third challenge was processing a rejection. The logic stated that if either of the initial approvers rejected the file, then the pipeline needed to stop further processing immediately and notify the business so a secondary file may be created and run through the workflow again. If the two initial emails to approvers were set to timeout after 34 minutes with no response (following the example above) and one of the approvers rejected the file in 3 minutes, the pipeline could not dilly dally for another 31 minutes spinning cycles waiting for the other approver.
Finally, each step in the process needed to be written to a logging table in an Azure SQL Database. That was not too difficult as it was a simple matter of passing dynamic values to a parameterized stored procedure. However, the number of times this needed to happen brought a much-unexpected consequence to my attention.
Setting a timeout value for the “Send approval email” action in Logic App
As I said earlier, I did not have much experience with designing Logic Apps going into the project, so I was a bit intimidated at first. The only other simple and expedient Logic App I had used with ADF pipelines in the past was for sending Success and Failure notification during pipeline executions. You can see in the designer below that the Failure Notification Logic App consisted of two steps, an HTTP request action and a Send an email action. This is one of the only ways to send emails from ADF as it is not natively supported.
The approval aspect of the Logic App was new for this project and that did require some additional experimenting. Within minutes, though, I was sending test emails to myself and approving or rejecting them, noting the behavior of the flow. If I did not approve or reject the email, the process would just continue to wait. It was not obvious on the surface if there was a timeout value I could set.
After some fumbling around a bit, I checked the Settings of the send approval email action in the Logic App and did find a duration value that could be set, using the ISO 8601 format. I was not familiar with the format, so I had to look it up. This is an example of the ISO 8601 time format I would need to use to set the timeout value, where “P” is the period, “T” is the time, “30” is the number of minutes and “M” is the actual minute designator. Simple enough.
PT30M
I now had a way to control the timeout behavior of the send approval email action so that it did not wait indefinitely if no one approved or rejected the emails. Now came the challenge of dynamically setting the number of minutes before forcing a timeout based on the time of day and file type. I needed to set this value in the pipeline and then pass this value to the Logic App that would do the work of actually sending the email.
I used an If Condition activity in the pipeline to set the ISO 8801-formatted value to the Logic App.
The conditional equated to true or false based on the value of a pipeline parameter called RateSheetType. The expression used for this evaluation is, @equals(pipeline().parameters.RatesheetType,'Daily')
. If it returned true, meaning it was a daily run, then the conditional executed a Set variable activity to set the TimeoutMinutes value to be the difference in minutes from the current time to 9:00 AM, assuming that the pipeline would have been triggered well before 9:00 AM.
The other values to consider is if the difference between now and the hour of 9:00 AM is greater than zero, meaning it is not past the hour and if it is less than two hours before the hour. If these are true, then set the timeout value to be the difference, otherwise set the value to a default of X minutes, in this case, 30. This last default is mainly done to allow enough time for a response prior to 9:00 AM and for a reminder to be sent.
I am going to go out on a limb here and say, as a career T-SQL developer and DBA, writing expressions in ADF is in a word, exasperating. Take, for example, the T-SQL code required to determine the timeout value from now until 9:00 AM on the same day. In this case, assume I am running the code at 8:05 AM, and I would expect the difference to be 55.
You can see it is fairly straightforward with minimal coding effort mainly due to the support for tried and true date/time functions and an effective CASE
statement.
Select CASE WHEN Datediff(mi,GetDate(),DATEADD(hh,9,DATEADD(dd,0, DATEDIFF(dd,0,GETDATE())))) > 0 AND Datediff(mi,GetDate(),DATEADD(hh,9, DATEADD(dd,0, DATEDIFF(dd,0,GETDATE())))) < 120 THEN 'PT' + CAST( Datediff(mi,GetDate(), DATEADD(hh,9,DATEADD(dd,0, DATEDIFF(dd,0,GETDATE())))) as varchar(2)) + 'M' ELSE 'PT30M' END as TimeoutValue
By contrast, the data factory expression to derive the same values is:
@If(and(lessOrEquals(div(div(mul(sub(ticks(convertFromUtc(concat(formatDateTime(utcNow(),'yyyy-MM-dd'),'T09:00:00Z'),'Eastern Standard Time')),ticks(formatDateTime(utcNow(),'yyyy-MM-ddTHH:mm:ssZ'))),100),1000000000),60),120),greaterOrEquals(div(div(mul(sub(ticks(convertFromUtc(concat(formatDateTime(utcNow(),'yyyy-MM-dd'),'T09:00:00Z'),'Eastern Standard Time')),ticks(formatDateTime(utcNow(),'yyyy-MM-ddTHH:mm:ssZ'))),100),1000000000),60),0)),concat('PT',string(div(div(mul(sub(ticks(convertFromUtc(concat(formatDateTime(utcNow(),'yyyy-MM-dd'),'T09:00:00Z'),'Eastern Standard Time')),ticks(formatDateTime(utcNow(),'yyyy-MM-ddTHH:mm:ssZ'))),100),1000000000),60)),'M'),'PT30M')
I will grant you there are additional functions used in the expression that make it a bit more complex such as converting from UTC to Eastern Standard Time. However, the time to develop the logic initially was order of magnitudes more than the SQL equivalent. This is partly due to the nested nature of the expression as well as the unfamiliar syntax. But it was made worse by the lack of common DateDiff
and DateAdd
functionality. You can see that to determine if the difference in minutes from now to 9:00 AM, the arcane ticks()
function combined with math functions div()
and mul()
were employed. Ticks()
returns the number of nanoseconds that have elapsed since January 1st, 0001.
greaterOrEquals(div(div(mul(sub(ticks(convertFromUtc(concat(formatDateTime(utcNow(),'yyyy-MM-dd'),'T09:00:00Z'),'Eastern Standard Time')),ticks(formatDateTime(utcNow(),'yyyy-MM-ddTHH:mm:ssZ'))),100),1000000000),60),0))
In the end, I chose to use the expression over the T-SQL code simply because I wanted to avoid having to have an entire dataset devoted to calling the stored procedure to return a five-character string value and the additional overhead that would have entailed. I will note here that if you decide to use a stored procedure to return a value, you will need to use the Lookup activity over the Execute Stored Procedure because the latter has no way of returning the output for further processing.
Now that the timeout value was working and the variable set, the next challenge was to pass that value to the Logic App.
Passing values to and from Logic Apps
Building a Logic App could take up an article in itself so to keep this as simple as possible, I want first to show you the HTTP Request that is called from the ADF pipeline via a Webhook activity. The Webhook supports a callback URI capability, unlike a simple Web activity. The callback functionality is needed to return values to the pipeline. In the screenshot below, you can see the Logic App request and the request body, which are the property values that the request will receive when called. Those property values can be passed to the Logic App and back to the calling application, along with additional values derived directly from the Logic App itself, such as the status of the email approval activity.
In the Data Factory pipeline screenshot below, you can see that the URL will be the POST URL from the Logic App HTTP Request and the Method is POST
. It is important to also specify the headers Name
and Value
as Content-Type and application/json respectively. Finally, the Body will contain the JSON-formatted values to pass to the request, again noting the value of the TimeoutMinutes
variable is assigned to the property value expected by the Logic App request, TimeOutValue
. Using the @json()
function to format the string is a bit tedious until you become acquainted with the syntax.
The timeout value for the Send approval email activity in the Logic App is not visible by default and can only be seen by going to Settings or looking for it in Code view. The value for the timeout will be set to @triggerBody()?['TimeOutValue']
.
That takes care of the timeout value passed to the HTTP request that initiates the Logic App, but what about returning the email approval status back to the ADF pipeline. It turns out, that is pretty straightforward as well, though it has not been well documented as far as I could discover.
You can see in the following screenshot, the Logic App executes an HTTP POST activity after the Send approval email activity. In the POST, all that is required is the callBackUri
value: @{triggerBody().callBackUri}
, as well as the “Output” values you wish to pass back to the calling pipeline. In this case, I wanted to receive the ApprovalStatus, which equated to the option they selected in the approval email, Approve, Reject or Null (for no selection or timeout). There are other values I was interested in like a final status, but for the sake of this example, I will stick with the ApprovalStatus
value.
I would like to note at this point, it was not apparent at first how to add in the output values in the Body of the HTTP POST. I had to again turn to Code view and just got lucky by placing a similarly formatted JSON block in the Body section. After I did, low and behold, the graphic displayed the expected and validated values. The final step to make this an end-to-end success was to test the values as they were returned to the pipeline. This leads into the third challenge, determining if the approval emails were rejected.
Dealing with rejection
To verify that I was receiving the approval status from the Logic App back to the pipeline as described above, I checked the value of the output of the Email_First_Approver_1 Webhook activity after manually rejecting a test email sent to my account. You can see in the screenshot below from a complete pipeline run that indeed I had received back a Reject
ApprovalStatus value.
From here, it was an easy matter of using this value in another If Condition activity for a rejected approval. As the workflow logic dictated, if either approver rejected the email, then the entire pipeline should stop processing at that point. The evaluation expression in the If Condition was a simple function:
@equals(activity('Email_First_Approver_1').output.ApprovalStatus,'Reject')
This is where things got a little interesting and messy. You can see in the above screenshot that there are four activities in the True container which would be executed if the ApprovalStatus
equated to Reject. Three of those activities were responsible for setting a variable, sending a final notification that the initial email was rejected and writing the status out to a SQL logging table, which I will cover in the next section. I would refer to the fourth activity as a hack or a workaround to overcome one of several limitations of the current version of ADF, and that is the inability to terminate execution of a pipeline based on an evaluation. It should be a simple matter of terminating a pipeline run if the email is rejected. In Logic Apps, by comparison, there is a Terminate control.
This concept does not, to my knowledge, exist in ADF. There are a couple of Azure feedback items that address this.
https://feedback.azure.com/forums/270578-data-factory/suggestions/34301983-throw-error-activity
I suppose it is possible to self-cancel the pipeline run based on a value, but I did not explore that option. I will provide a link here if you would like to explore dynamically creating a URL string and POST the cancelling of your pipeline instead of the solution I chose.
https://docs.microsoft.com/en-us/rest/api/datafactory/pipelineruns/cancel
The path I chose was actually mentioned in one of the feedback links, which is to intentionally generate an error in the pipeline to force it to stop. Prior to the termination, it is possible to write out a log entry noting the intentional stoppage. The below stored procedure, not my proudest achievement in SQL development history, serves the purpose of killing my pipeline upon rejection and can be called with an Execute Stored Procedure activity right after the log entry is written.
CREATE PROCEDURE [dbo].[spThrowError] AS BEGIN RAISEERROR('Forcing Pipeline To Stop', 15, 1) END GO
Logging it all, and you are kidding me
Up to this point, I have demonstrated only a few of the overall pipeline activities for the approval process. In the final solution there are many more control tasks that send reminders, move files from source to sink locations, and as mentioned, write out log entries to a SQL table at almost every step. I created another simple stored procedure that performed this logging and placed it amply throughout the pipeline so that analysts could use the information to streamline the process in the future. It would be helpful to know, for example, how often the emails were being rejected or the duration of time between when the emails were sent and when they were approved. In this case, time really was money.
Below is the code for the stored procedure
CREATE PROCEDURE [dbo].[spInsertPipelineExecution] ( @PipelineId uniqueidentifier , @PipelineStatus nvarchar(600) , @PipelineStartDate datetime2 , @PipelineEndDate datetime2 , @TriggerName nvarchar(50) , @ActivityName nvarchar(1000) , @ApproverEmail nvarchar(1000) , @EmailStatus nvarchar(1000) , @FileName nvarchar(1000) ) AS BEGIN DECLARE @clean_PipelineRunId UNIQUEIDENTIFIER = CAST(REPLACE(REPLACE(@PipelineId, '}', ''), '{', '') AS UNIQUEIDENTIFIER); INSERT INTO [dbo].[RateSheetApprovalPipeline] ( [PipelineID] ,[PipelineStatus] ,[PipelineStartDate] ,[PipelineEndDate] ,[TriggerName] ,[ActivityName] ,[ApproverEmail] ,[EmailStatus] ,[FileName] ) VALUES ( @clean_PipelineRunId , @PipelineStatus , @PipelineStartDate , @PipelineEndDate , @TriggerName , @ActivityName , @ApproverEmail , @EmailStatus , @FileName ) END
The values passed to the stored procedure would, in almost all cases, be dynamic such as is the case for PipelineID, PipelineStartDate and PipelineEndDate (which truth be told is a misnomer as this equates to an activity end date and time predominantly). At any rate, you can see the passing of some of these values at various stages of execution below.
Below is a representation of the rows for different pipeline runs. The timeout values of PT2M was set for my testing purposes. I did not want to wait around for 20 plus minutes for the emails to timeout while debugging.
So this brings me to the unexpected surprise which really should not have been a surprise after all of my many years working within the constraints of the technologies I implement. I had initially wanted to do all of the logic for the workflow and logging steps in one big pipeline. It was not until I was finishing up the last few tasks that I hit upon the 40 activity limitation per pipeline. I do not have a screenshot to share on this one, but trust me, if you try to add more than 40, you will not be able to publish the pipeline. You can find other limitations for Data Factory in the following link.
Needless to say, I had to take a more modular approach and to be honest, that was probably for the best. To build a modular pipeline design does bring its own challenges, however, which I will be happy to delve into in the future.
Conclusion
For now, I am happy to pass along a few of the gaps I had to fill in with ADF. I am sure there are better (and worse) ways of doing some of the things I have demonstrated here. I am also sure that Microsoft will review the feedback and at some point in the future will address the concerns raised here as well as others. As I said at the outset, knowing the benefits and limitations upfront can save a lot of time and frustration. I hope that I have been able to shed a little more light on what you can do if you are tasked with building a fairly complex workflow pipeline in ADF and hit upon similar challenges.
The post Azure Data Factory pipelines: Filling in the gaps appeared first on Simple Talk.
from Simple Talk https://ift.tt/3ktbbid
via
No comments:
Post a Comment