-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfunction_app.py
More file actions
204 lines (168 loc) · 9.69 KB
/
function_app.py
File metadata and controls
204 lines (168 loc) · 9.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
############################################## TODO #######################################################
# Get Multiple PDFs
## Get Page of RA
## Get key values from RA either by code or LLM or Both (see microsoft "schema_entity_extraction")
## Manual Check
## Check for specific words in RA page ( see microsoft schema_keywords_ra)
# Confidence score
## Document intelligence - average all words scores
## Manual confidence score check (see microsoft)
## Error Handling - create separate error handling file
## Embeddings for large documents OR set token limitations
import azure.functions as func
from azure.storage.blob import BlobServiceClient
import logging
import os
import base64
import uuid
## import local files
import src.run_DocumentIntelligence as docintel
import src.run_LLMClasscification as llmclass
import src.run_FabricEventHub as eventhub
########################## Parameters ##########################
# Fabric endpoints
eventhub_endpoint = os.getenv("Eventhub_endpoint")
eventhub_name = os.getenv("Eventhub_name")
docintel_endpoint = os.environ.get("docintelligenceendpoint")
docintel_apim_key = os.environ.get("docintelligencekey")
docintel_modelId = "prebuilt-layout" # "prebuilt-document" : General doesnt exist in RSA
container_name="document-processing-dropzone"
# Validate required environment variables
required_vars = {
"Eventhub_endpoint": eventhub_endpoint,
"Eventhub_name": eventhub_name,
"docintelligenceendpoint": docintel_endpoint,
"docintelligencekey": docintel_apim_key,
}
missing = [k for k, v in required_vars.items() if not v]
if missing:
raise ValueError(f"Missing required environment variables: {missing}")
############################################################################ Helper Functions ###################################################
def format_final_output(run_id: str, llm_response: dict, pdf_size: int) -> str:
return (
"Document processing executed successfully\n"
f"Run ID: {run_id}\n"
f"FileName:{llm_response['FileName']}\n"
f"File size: {pdf_size} bytes\n"
f"Date Processed: {llm_response['Datetime']}\n\n"
f"LLM Result: {llm_response['Result']}\n"
f"LLM Confidence: {llm_response.get('Confidence', 'N/A')}\n"
f"LLM Explanation: {llm_response['Explanation']}\n\n"
f"Extracted Text:\n{llm_response['ExtractedText']}"
)
########################################################################### App settings ####################################################
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) #app = func.FunctionApp()
###################################################################################################################################################
########################################################################### Blob Trigger ##########################################################
### Def Function name (blob_documentprocessing) determines the deployed function name in Azure
@app.blob_trigger(arg_name="myblob", path="document-processing-dropzone/Input/{name}",
connection="rgdocumentprocessinb772_STORAGE")
def blob_documentprocessing(myblob: func.InputStream):
run_id = str(uuid.uuid4())
logging.info(f"Python blob trigger function processed\n"
f"Run ID: {run_id}\n"
f"File Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes\n")
try:
pdf_bytes = myblob.read()
if not pdf_bytes:
raise ValueError("Unable to read blob file")
pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8")
logging.info(f"File read and converted to base64, with file size: {len(pdf_bytes)} bytes\n")
# get file name
full_file_name=os.path.basename(myblob.name)
file_name=(os.path.splitext(full_file_name)[0])
logging.info("Name:%s\n", full_file_name)
# ########################## Call Document Intelligence endpoint
docintel_output = docintel.run_document_intelligence(pdf_base64, docintel_apim_key, docintel_endpoint)
# Process results into text
full_text = docintel.process_results(docintel_output)
logging.info(f"Text extracted from Document Intelligence, results:\n {full_text}\n")
########################## Call LLM Classification
try:
client, system_prompt, user_prompt, example_text, example_response = llmclass.initialize_llm_inputs()
llm_completion, llm_response = llmclass.run_llm_classification(client,full_text,full_file_name, system_prompt, user_prompt, example_text, example_response)
llm_response = {"RunId": run_id, **llm_response}
########################## Save results to Blob Storage
try:
# Blob connection
blob_service_client = BlobServiceClient.from_connection_string(os.environ["rgdocumentprocessinb772_STORAGE"])
container_client=blob_service_client.get_container_client(container_name)
logging.info("Connection set to upload text to blob\n")
# Upload to blob storage
# save both docintel and llm results
final_output = format_final_output(run_id,llm_response, pdf_size= len(pdf_bytes))
container_client.upload_blob(name=f"Output/{file_name}.txt",data=final_output,overwrite=True)
logging.info("Uploaded text file to blob storage successfully\n")
except Exception as e:
logging.exception("Error during file save to container ")
########################## Fabric Event Hub - Save data
try:
eventhub_response = eventhub.eventhub_save(eventhub_endpoint, eventhub_name, llm_response)
logging.info("Event Hub save successfully\n")
logging.info(f"Completed with results:\n{llm_response}\n")
except Exception as e:
logging.exception("Error during Event Hub save ")
########## Exceptions
except Exception as e:
logging.exception("Error during LLM Classification")
except ValueError as ve:
logging.error(f"ValueError during document processing: {ve}")
except Exception as e:
logging.exception(f"Unexpected error during document processing: {str(e)}")
###################################################################################################################################################
########################################################################### HTTP Trigger ##########################################################
@app.route(route="func_document_processing") # this is used in the url path
def http_documentprocessing(req: func.HttpRequest) -> func.HttpResponse:
run_id = str(uuid.uuid4())
logging.info(f'Python HTTP trigger function processed a request\nRun ID: {run_id}')
try:
pdf_bytes = req.get_body()
full_file_name = req.params.get("filename") # doesnt work
if not pdf_bytes:
raise ValueError("No file provided in request body")
# convert pdf to base64
pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8")
logging.info(f"File read and converted to base64, with file size: {len(pdf_bytes)} bytes\n")
# ########################## Call Document Intelligence endpoint
docintel_output = docintel.run_document_intelligence(pdf_base64, docintel_apim_key, docintel_endpoint)
########################## Process results into text
full_text = docintel.process_results(docintel_output)
logging.info(f"Text extracted from Document Intelligence, results:\n {full_text}\n")
########################## Call LLM Classification
try:
client, system_prompt, user_prompt, example_text, example_response = llmclass.initialize_llm_inputs()
llm_completion, llm_response = llmclass.run_llm_classification(client,full_text,full_file_name, system_prompt, user_prompt, example_text, example_response)
llm_response = {"RunId": run_id, **llm_response}
########################## Fabric Event Hub - Save data
try:
eventhub_response = eventhub.eventhub_save(eventhub_endpoint, eventhub_name, llm_response)
logging.info("Event Hub save successfully\n")
# Create output
final_output = format_final_output(run_id,llm_response, pdf_size= len(pdf_bytes))
logging.info(f"Completed with results:\n{llm_response}\n")
return func.HttpResponse(body=final_output, status_code=200 )
except Exception as e:
logging.exception("Error during Event Hub save ")
return func.HttpResponse(
body=f"FAILED to save to Event Hub.Event Hub Error: {str(e)}",
status_code=500)
########## Exceptions
except Exception as e:
logging.exception("Error during LLM Classification")
return func.HttpResponse(
body=f"Document processing SUCCESSFUL, but LLM Classification FAILED.\nDocument Intelligence Output:\n{docintel_output}\n\nLLM Error: {str(e)}",
status_code=500
)
except ValueError as ve:
logging.error(f"ValueError during document processing: {ve}")
return func.HttpResponse(
body=f"Document processing FAILED.\nError: {ve}",
status_code=400
)
except Exception as e:
logging.exception(f"Unexpected error during document processing: {str(e)}")
return func.HttpResponse(
body=f"Document processing FAILED.\nUnexpected Error: {str(e)}",
status_code=500
)