丰富的第三方库使得python非常适合用于进行数据分析,最近在项目中就涉及到java调用python实现的算法。目前,java调用python主要包括三种方法:1、利用runtime在本地运行python脚本;2、通过jython调用;3、java通过rpc(远程过程调用)调用python。其中,runtime调用python实际上是通过cmd命令本地运行python脚本,对传递参数的长度有限制;而通过jython调用的方式无法导入numpy等第三方库。通过rpc远程调用的方式则不存在上述限制,是比较理想的方式。
1、通过runtime调用python
代码如下:
package pythonTest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class pythonTest {
public static void main(String[] args) {
String filePath = ".\\src\\AverageEcho6211701.txt";
String coePath = ".\\src\\OVlidar20190116.txt";
String[] data = TxtUtil.getContentByLine(filePath,3000);
String[] coeData = TxtUtil.getContentByLine(coePath, 500);
System.out.println(data[0]);
int bearing = 90;
double decCoe = 0.25;
try {
String[] pythonArgs = new String[] { "python", ".\\src\\11.py", String.valueOf(bearing), String.valueOf(decCoe),
String.join(",", data)};
String[] appendPythonArgs = new String[pythonArgs.length + coeData.length];
System.arraycopy(pythonArgs, 0, appendPythonArgs, 0, pythonArgs.length);
System.arraycopy(coeData, 0, appendPythonArgs, pythonArgs.length, coeData.length);
Process proc = Runtime.getRuntime().exec(pythonArgs);// 执行py文件
// 定义Python脚本的返回值
String result = null;
BufferedReader in = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line = null;
while ((line = in.readLine()) != null) {
result = line;
}
in.close();
proc.waitFor();
System.out.println(result);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
通过runtime调用实际上是通过cmd命令在本地运行python脚本,参数以字符串形式进行传递,当字符串超出一定长度时程序会报错。
java.io.IOException: Cannot run program "python": CreateProcess error=206, 文件名或扩展名太长。
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at java.lang.Runtime.exec(Runtime.java:620)
at java.lang.Runtime.exec(Runtime.java:485)
at pythonTest.pythonTest.main(pythonTest.java:22)
Caused by: java.io.IOException: CreateProcess error=206, 文件名或扩展名太长。
at java.lang.ProcessImpl.create(Native Method)
at java.lang.ProcessImpl.<init>(ProcessImpl.java:386)
at java.lang.ProcessImpl.start(ProcessImpl.java:137)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 3 more
2、jython
jython是在jvm上实现的python,由java编写,jython将python源码变异成jvm字节码,由jvm执行对应的字节码。使用前需要在pom文件中导入相关的jar包:
<!-- https://mvnrepository.com/artifact/org.python/jython-standalone -->
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.1</version>
</dependency>
相关代码如下:
package pythonTest;
import org.python.core.*;
import org.python.util.PythonInterpreter;
public class pythonTest {
public static void main(String[] args) {
String filePath = ".\\src\\main\\resources\\AverageEcho6211701.txt";
String coePath = ".\\src\\main\\resources\\OVlidar20190116.txt";
String[] data = TxtUtil.getContentByLine(filePath,3000);
String[] coeData = TxtUtil.getContentByLine(coePath, 500);
System.out.println(data[0]);
int bearing = 90;
double decCoe = 0.25;
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.exec("import sys");
interpreter.execfile(".\\src\\main\\python\\11.py");
// 第一个参数为期望获得的函数(变量)的名字,第二个参数为期望返回的对象类型
PyFunction pyFunction = interpreter.get("add", PyFunction.class);
int a = 5, b = 10;
//调用函数,如果函数需要参数,在Java中必须先将参数转化为对应的“Python类型”
PyObject pyobj = pyFunction.__call__(new PyInteger(a), new PyInteger(b));
System.out.println("the anwser is: " + pyobj);
}
}
当python脚本中需要导入第三方库时程序报错:
Exception in thread "main" Traceback (most recent call last):
File ".\src\main\python\11.py", line 2, in <module>
import numpy
ImportError: No module named numpy
3、rpc远程调用python
rpc框架实际提供了一套机制,使得应用程序之间可以进行通信,采用server/client模型,客户端调用服务端接口。目前开源的rpc框架有Dubbo、Thrift、grpc等,其中dubbo仅支持java,thrift与grpc支持多语言,使用grpc实现java调用python程序,java实现客户端,python实现服务端。grpc具有以下特点:1、grpc通过protobuf定义接口,可以将数据序列化成二进制编码,大幅减少需要传输的数据量,从而提高性能;2、grpc支持流式场景。
首先,定义protobuf接口:
syntax = "proto3";
package com.wayealCloud.mlp;
service MlpAlgorithm
{
rpc mlpInverting(Request) returns (Result){}
}
message Request
{
string data = 1;
string coeData = 2;
string bearing = 3;
string decCoe = 4;
}
message Result
{
string exData = 1;
}
在proto文件所在目录下进行编译:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./mlp.proto
然后,编写python服务端程序,代码如下:
# -*- coding: utf-8 -*-
"""
@author: yhl
@time: 2019/09/16
激光雷达反演算法
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ETLandTraining.spark_code.micro_pulse_lidar.micro_pulse_lidar import SectionalSlopeAlgorithm, \
FernaldAlgorithm, WaveletDenoising, move_avg, fanyan_algorithm
from concurrent import futures
from src.ETLandTraining.spark_code.micro_pulse_lidar.src import mlp_pb2_grpc, mlp_pb2
import numpy as np
import math
import matplotlib.pyplot as plt
import time
import grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = '192.168.200.234'
IP = "9000"
class ServiceMain(mlp_pb2_grpc.MlpAlgorithmServicer):
def mlpInverting(self, request, context):
data = request.data.split(',')
data = [item.split(' ') for item in data]
data = np.array(data).astype('float32')
ovlidar = request.coeData.split(',')
ovlidar = np.array(ovlidar).astype('float32')
bearing = int(request.bearing)
dec_coe = float(request.decCoe)
p1 = data[:, 1]
p2 = data[:, 2]
er, dec_ratio = fanyan_algorithm(bearing, p1, p2, ovlidar, dec_coe)
return mlp_pb2.Result(exData=str(er[0]))
def serve():
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
mlp_pb2_grpc.add_MlpAlgorithmServicer_to_server(ServiceMain(), grpc_server)
grpc_server.add_insecure_port("{}:{}".format(_HOST, IP))
grpc_server.start()
print("grpc service start")
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
grpc_server.stop(0)
if __name__ == '__main__':
serve()
最后,编写java客户端程序。第一步,定义protobuf接口文件且必须与python端保持一致;第二步,在pom文件中添加相关的jar包及插件;第三部,编译及编写客户端程序。pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wayeal.cloud.mlp</groupId>
<artifactId>mlp</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://github.com/grpc/grpc-java-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.9.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.23.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
客户端程序如下:
package com.wayealCloud.mlp.mlpClient;
import com.wayealCloud.mlp.util.TxtUtil;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
public class MlpClient {
private final ManagedChannel channel;
private final MlpAlgorithmGrpc.MlpAlgorithmBlockingStub blockingStub;
public MlpClient(String host,int port){
channel = ManagedChannelBuilder.forAddress(host,port)
.usePlaintext()
.build();
blockingStub = MlpAlgorithmGrpc.newBlockingStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public String greet(String data, String coeData, String bearing, String decCoe){
Request request = Request.newBuilder()
.setData(data)
.setCoeData(coeData)
.setBearing(bearing)
.setDecCoe(decCoe)
.build();
Result result = blockingStub.mlpInverting(request);
System.out.println(result.getExData());
return result.getExData();
}
public static void main(String[] args) throws InterruptedException {
String filePath = ".\\src\\main\\resources\\AverageEcho6211701.txt";
String coePath = ".\\src\\main\\resources\\OVlidar20190116.txt";
String[] data = TxtUtil.getContentByLine(filePath,3000);
String[] coeData = TxtUtil.getContentByLine(coePath, 500);
String bearing = "90";
String decCoe = "0.25";
MlpClient client = new MlpClient(Constants.IP, Constants.port);
String result = client.greet(String.join(",", data), String.join(",", coeData), bearing, decCoe);
}
}
结果如下:
0.3565467136205529
综上所述,基于grpc,java实现客户端,python实现服务端,通过远程调用的方式是比较理想的方法。