Description
In case of Flink python UDF having python third-party dependencies, if used in Flink sql query job is getting failed with “ModuleNotFoundError: No module named xxxxxxx”
To Reproduce
Step1: Mention the third-party dependencies in requirements.txt
e.g
jsonpath-ng==1.5.3
Step2: Write python udf and save with function name i.e json_update_jpath.py
e.g
from jsonpath_ng import parse
import json
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())
def json_update_jpath(json_event, obj, jpath):
json_data = json.loads(json_event)
jpath_expr = parse(jpath)
jpath_expr.update(json_data, obj)
json_out_string = json.dumps(json_data)
return json_out_string
Step3: Mention these python udf configuration
{
"PYTHON_FILES": "gs://*****/python-udf/master/json_update_jpath.py",
"PYTHON_REQUIREMENTS": "gs://*****/python-udf/master/requirements.txt",
"PYTHON_ARCHIVES": "gs://*****/python-udf/master/data.zip#data",
"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE": "10000",
"PYTHON_FN_EXECUTION_BUNDLE_SIZE": "100000",
"PYTHON_FN_EXECUTION_BUNDLE_TIME": "1000"
}
Step4: Run Dagger job and use the UDF in Flink SQL query
SELECT
json_update_jpath(
'{"k1":"v1","k2":"v2","k3":"v3"}',
'replaced_value',
'$.k2'
) as json_updated_event
FROM
`data_streams_0`
After following these steps below exceptions occur in Job-manager.
Cannot instantiate user-defined function 'json_update_jpath'
ModuleNotFoundError: No module named 'jsonpath_ng'.
Detailed error
org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'json_update_jpath'.
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716)
at io.odpf.dagger.core.StreamManager.registerOutputStream(StreamManager.java:185)
at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:104)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.table.api.ValidationException: Cannot instantiate user-defined function 'json_update_jpath'.
at org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:219)
at org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659)
at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:562)
at org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$3(FunctionCatalog.java:624)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:624)
at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362)
at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
... 18 more
Caused by: java.lang.IllegalStateException: Instantiating python function 'python_udfs.json_update_jpath.json_update_jpath' failed.
at org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:48)
at org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206)
... 33 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45)
... 34 more
Caused by: org.apache.flink.api.python.shaded.py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 2410, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/opt/flink/opt/python/pyflink.zip/pyflink/java_gateway.py", line 169, in getPythonFunction
udf_wrapper = getattr(importlib.import_module(moduleName), objectName)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/tmp/pyflink/207a6c73-f7ec-4d01-b2ba-ef3a82c18034/26bad213-4e33-4d22-9900-731ffba2fdb1/python_udfs/python_udfs/json_update_jpath.py", line 1, in <module>
from jsonpath_ng import parse
ModuleNotFoundError: No module named 'jsonpath_ng'
at org.apache.flink.api.python.shaded.py4j.Protocol.getReturnValue(Protocol.java:476)
at org.apache.flink.api.python.shaded.py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy260.getPythonFunction(Unknown Source)
at org.apache.flink.client.python.PythonFunctionFactoryImpl.getPythonFunction(PythonFunctionFactoryImpl.java:47)
at org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:131)
... 38 more
2022-10-19 12:31:27,580 WARN org.apache.flink.client.deployment.application.DetachedApplicationRunner - Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'json_update_jpath'.
at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_322]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_322]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_322]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_322]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:104) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_322]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'json_update_jpath'.
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164) ~[flink-table_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) ~[flink-table_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215) ~[flink-table_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) ~[flink-table_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716) ~[flink-table_2.12-1.14.3.jar:1.14.3]
at io.odpf.dagger.core.StreamManager.registerOutputStream(StreamManager.java:185) ~[?:?]
at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:33) ~[?:?]
... 12 more
Expected behavior
The expected output of the sql query suppose to be {"k1":"v1","k2":"replaced_value","k3":"v3"}
Analysis
We have gone through the logs of task-manager and job-manager during the application startup. The python third-party dependencies as mentioned in requirements.txt file are properly installed in task-manager container, however not in jobmanager. If you manually install the third-party dependency in job-manager container and restart the job, the job is properly started with the udf function properly resulting in the data.
We are working on such use-case where python udf having third-party dependencies are needed. Please connect with us for more details.