python hdfs远程连接以及上传文件,读取文件内容,删除文件

目录

一、python连接操作hdfs

1 往hdfs上传文件

2 处理并存储到hdfs

3 读取hdfs上的txt文件


这里使用的是 pip 安装,很方便:

pip install hdfs
from hdfs.client import Client client = Client("http://LocalHost:Port")  client.makedirs('/ml/zmingmingmng')#建立文件夹 client.delete('/ml/zmming')#删除文件夹 client.upload("/ml/zmingmingmng/zm.txt","E:/ttt/testhdfs.txt")#上传文件 client.download("/ml/zmingmingmng/zm.txt","E:/ming.txt")#下载文件 
# -*- encoding=utf-8 -*- from hdfs.client import Client   client = Client("http://XXX.XXX.XX.XX:50070")  # 创建目录 def mkdirs(client, hdfs_path):     client.makedirs(hdfs_path)   # 删除hdfs文件 def delete_hdfs_file(client, hdfs_path):     client.delete(hdfs_path)   # 上传文件到hdfs def put_to_hdfs(client, local_path, hdfs_path):     client.upload(hdfs_path, local_path, cleanup=True)   # 从hdfs获取文件到本地 def get_from_hdfs(client, hdfs_path, local_path):     client.download(hdfs_path, local_path, overwrite=False)   # 追加数据到hdfs文件 def append_to_hdfs(client, hdfs_path, data):     client.write(hdfs_path, data, overwrite=False, append=True)   # 覆盖数据写到hdfs文件 def write_to_hdfs(client, hdfs_path, data):     client.write(hdfs_path, data, overwrite=True, append=False)   # 移动或者修改文件 def move_or_rename(client, hdfs_src_path, hdfs_dst_path):     client.rename(hdfs_src_path, hdfs_dst_path)   # 返回目录下的文件 def list(client, hdfs_path):     return client.list(hdfs_path, status=False)   if __name__ == '__main__':     # 调用     kk=list(client,"/user/admin/deploy/user_lable_dimension/")     for each in kk:         print(each)   

 

1 往hdfs上传文件

from hdfs.client import Client  """往hdfs上传文件"""  # TODO 往hdfs上传文件 client = Client("http://XXX.XXX.XX.XX:50070")  # 新建文件夹 hdfs_path ="【文件要存放的目录路径,eg:/a/b/c】" client.makedirs(hdfs_path)  print("uploading data...") client.upload(hdfs_path, "intersection.xlsx", overwrite=True)	# 资源中心上传的文件 

 

2 处理并存储到hdfs

# TODO 先得到结果列表。eg:i_list  # TODO 把结果列表存储成文件上传到hdfs print("===============================================") i_df = pd.DataFrame(i_list) client = Client("http://XXX.XXX.XX.XX:50070")  fout = "【文件要存放的路径,eg:/a/b/c.csv】"  # hdfs下的目录 with client.write(fout, encoding='utf-8') as writer:     i_df.to_csv(writer) print("存储成功")

 

3 读取hdfs上的txt文件

from hdfs.client import Client import json from kafka import KafkaConsumer import time import pyhdfs  def  GetEncodingSheme(_filename):     """ 查看文本编码方式 """     with open(_filename, 'rb') as file:         buf = file.read()     result = chardet.detect(buf)     return result['encoding']   def read_hdfs_file(client, filename):     """读取hdfs文件内容,将每行存入数组返回"""     lines = []     print("开始读取txt数据")     with client.open(filename, delimiter='\n') as reader:         for line in reader:             lines.append(line.decode("GB2312").strip())     return lines   def deleteHDFSfile(client, hdfs_path):     """删除hdfs文件,删除文件夹时该文件夹必须为空"""     client.delete(hdfs_path)    if __name__ == "__main__": 	print(GetEncodingSheme('intersection.xlsx'))    # GB2312          # hdfs连接     client = pyhdfs.HdfsClient(hosts="http://xxxxxx:50070,http://xxxxxx:50070", user_name="xxxxxx")      # TODO 读取hdfs文件内容,将每行存入数组返回     hdfs_path = "【文件路径,eg:/a/b/c.xlsx】"    # hdfs存储目录     print("===============================================")     print("开始读取hdfs上的txt文件")     lines = read_hdfs_file(client, hdfs_path)     print(lines)     print("读取完成")     print("===============================================")       # TODO 删除hdfs存储目录下的文件     hdfs_path = "【文件路径】"     deleteHDFSfile(client, hdfs_path)