先撇开spark,咱们先看下在python里怎么往kafka里写数据,这里用的是一个包是kafka-python,和java版的kafka-client一样,它是python版的kafka客户端,官方地址是:kafka-python
环境准备
- 安装kafka-python包(方式一)
pip install kafka-python // 前提是机器上你已经安装了pip
- 安装kafka-python包(方式二)
下载kafka-python包:https://pypi.org/project/kafka-python/#files
tar xzvf kafka-python-1.4.3.tar.gz // 解压
cd kafka-python-1.4.3 // 切换到包目录
python setup.py install // 安装
使用方式
- KafkaProducer
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['localhost:port'])
# Asynchronous by default
future = producer.send('my_topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
- KafkaConsumer
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:port'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
接着我们看下在spark中,如何运行python脚本,并把数据写入kafka
Local 模式
- 环境准备
// 参见上面,保证spark-client使用的python环境里安装上kafka-python包即可
- 提交命令
spark/bin/spark-submit \
--master local[4] \
demo.py
所以在本地模式中,我们只要保证本地用到的python环境里面安装上kafka-python就行了,提交命令上没有啥改变,是不是非常简单
Standalone or Yarn 模式
- 环境准备
tar xzvf kafka-python-1.4.3.tar.gz // 解压
cd kafka-python-1.4.3 // 切换到包目录
zip -r kafka.zip ./kafka // 把kafka这个源码包打包成zip
- 提交命令
spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--driver-memory 1G --executor-cores 4 --driver-cores 1 --total-executor-cores 120 --executor-memory 4G \
--py-files /home/work/kafka.zip \
demo.py
在集群模式中,因为最终任务会被提交到集群的机器上执行,依赖的是集群机器上的python环境,所以仅仅本地安装kafka-python包是没有作用的,必须通过--py-files参数把本地依赖的包带上去
补充说明:spark在运行python脚本时,常常需要带上一些依赖包,这里简单说明一下
依赖单个文件
- 使用sc.addPyFiles(path)函数,代码里实现文件添加到SparkContext中
sc = SparkContext()
sc.addPyFile(file_path)
- 命令中使用--py-files参数指定文件路径
spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--py-files dependency.py \
demo.py
依赖自己写的其他模块
将自己写的模块打成一个zip包,然后当做单个文件使用。假设你有一个自己写的包叫util,里面有两个文件:init.py和util.py,然后你的demo.py里需要from util import util,你就可以直接打成util.zip包使用
zip -r util.zip ./util // 打包成zip
- 同样使用sc.addPyFiles(path)函数,代码里实现文件添加到SparkContext中
sc = SparkContext()
sc.addPyFile(file_path)
- 或者是命令中使用--py-files参数指定文件路径
spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--py-files util.zip \
demo.py
依赖三方模块
像上文中说的kafka-python就是一个三方的包,但是这个包提供了源码,这个时候就可以直接把源码打包成zip,然后像调用自己写的模块包一样引用进去
- 能下载到依赖的tar包
直接找到源码,然后打包使用
tar xzvf kafka-python-1.4.3.tar.gz // 解压
cd kafka-python-1.4.3 // 切换到包目录
zip -r kafka.zip ./kafka // 把kafka这个源码包打包成zip
- 其他方式安装
这种一般看不到源码包,譬如执行pip install numpy。我们知道,第三方的包一般会被安装到$python_path/lib/python2/site-packages目录下面,这时就可以到这个目录下,把需要的包打包成zip。然后通过上面说的方式使用
注意:--py-files参数是可以加入多个文件的,多个文件之间以逗号分隔即可
spark/bin/spark-submit \
--master spark://192.168.1.1:8888 \
--py-files dependency1.zip,dependency2.zip,dependency.py \
demo.py