目录
这里使用的是 pip 安装,很方便:
pip install hdfs
from hdfs.client import Client client = Client("http://LocalHost:Port") client.makedirs('/ml/zmingmingmng')#建立文件夹 client.delete('/ml/zmming')#删除文件夹 client.upload("/ml/zmingmingmng/zm.txt","E:/ttt/testhdfs.txt")#上传文件 client.download("/ml/zmingmingmng/zm.txt","E:/ming.txt")#下载文件
# -*- encoding=utf-8 -*- from hdfs.client import Client client = Client("http://XXX.XXX.XX.XX:50070") # 创建目录 def mkdirs(client, hdfs_path): client.makedirs(hdfs_path) # 删除hdfs文件 def delete_hdfs_file(client, hdfs_path): client.delete(hdfs_path) # 上传文件到hdfs def put_to_hdfs(client, local_path, hdfs_path): client.upload(hdfs_path, local_path, cleanup=True) # 从hdfs获取文件到本地 def get_from_hdfs(client, hdfs_path, local_path): client.download(hdfs_path, local_path, overwrite=False) # 追加数据到hdfs文件 def append_to_hdfs(client, hdfs_path, data): client.write(hdfs_path, data, overwrite=False, append=True) # 覆盖数据写到hdfs文件 def write_to_hdfs(client, hdfs_path, data): client.write(hdfs_path, data, overwrite=True, append=False) # 移动或者修改文件 def move_or_rename(client, hdfs_src_path, hdfs_dst_path): client.rename(hdfs_src_path, hdfs_dst_path) # 返回目录下的文件 def list(client, hdfs_path): return client.list(hdfs_path, status=False) if __name__ == '__main__': # 调用 kk=list(client,"/user/admin/deploy/user_lable_dimension/") for each in kk: print(each)
1 往hdfs上传文件
from hdfs.client import Client """往hdfs上传文件""" # TODO 往hdfs上传文件 client = Client("http://XXX.XXX.XX.XX:50070") # 新建文件夹 hdfs_path ="【文件要存放的目录路径,eg:/a/b/c】" client.makedirs(hdfs_path) print("uploading data...") client.upload(hdfs_path, "intersection.xlsx", overwrite=True) # 资源中心上传的文件
2 处理并存储到hdfs
# TODO 先得到结果列表。eg:i_list # TODO 把结果列表存储成文件上传到hdfs print("===============================================") i_df = pd.DataFrame(i_list) client = Client("http://XXX.XXX.XX.XX:50070") fout = "【文件要存放的路径,eg:/a/b/c.csv】" # hdfs下的目录 with client.write(fout, encoding='utf-8') as writer: i_df.to_csv(writer) print("存储成功")
3 读取hdfs上的txt文件
from hdfs.client import Client import json from kafka import KafkaConsumer import time import pyhdfs def GetEncodingSheme(_filename): """ 查看文本编码方式 """ with open(_filename, 'rb') as file: buf = file.read() result = chardet.detect(buf) return result['encoding'] def read_hdfs_file(client, filename): """读取hdfs文件内容,将每行存入数组返回""" lines = [] print("开始读取txt数据") with client.open(filename, delimiter='\n') as reader: for line in reader: lines.append(line.decode("GB2312").strip()) return lines def deleteHDFSfile(client, hdfs_path): """删除hdfs文件,删除文件夹时该文件夹必须为空""" client.delete(hdfs_path) if __name__ == "__main__": print(GetEncodingSheme('intersection.xlsx')) # GB2312 # hdfs连接 client = pyhdfs.HdfsClient(hosts="http://xxxxxx:50070,http://xxxxxx:50070", user_name="xxxxxx") # TODO 读取hdfs文件内容,将每行存入数组返回 hdfs_path = "【文件路径,eg:/a/b/c.xlsx】" # hdfs存储目录 print("===============================================") print("开始读取hdfs上的txt文件") lines = read_hdfs_file(client, hdfs_path) print(lines) print("读取完成") print("===============================================") # TODO 删除hdfs存储目录下的文件 hdfs_path = "【文件路径】" deleteHDFSfile(client, hdfs_path)
热门文章
- Objective-C 基础教程第九章,内存管理
- 啥属相不能养狗(哪个属相不能养狗)
- 2月1日最新Clash-X订阅 | 19.1M/S|2025年Clash/SSR/V2ray/Shadowrocket免费节点地址链接分享
- 自己给宠物打疫苗能开检疫证明吗怎么开(自己帮宠物打疫苗)
- 2月21日最新Free Clash Meta订阅 | 21M/S|2025年Shadowrocket/SSR/Clash/V2ray免费节点地址链接分享
- 2月3日最新Free Clash Meta订阅 | 22.7M/S|2025年Shadowrocket/SSR/Clash/V2ray免费节点地址链接分享
- 动物常用的免疫接种方法(动物免疫接种实验原理)
- Android中Handler的使用方法
- 动物医院诊疗管理系统官网登录(动物医院诊疗范围)
- 襄阳流浪宠物领养中心在哪里?(最新襄阳市流浪狗救助站地址)
归纳
-
75 2025-02