Install dependencies¶
!pip install googletrans
Requirement already satisfied: googletrans in /usr/local/lib/python3.6/dist-packages (3.0.0)
Requirement already satisfied: httpx==0.13.3 in /usr/local/lib/python3.6/dist-packages (from googletrans) (0.13.3)
Requirement already satisfied: idna==2.* in /usr/local/lib/python3.6/dist-packages (from httpx==0.13.3->googletrans) (2.10)
Requirement already satisfied: certifi in /usr/local/lib/python3.6/dist-packages (from httpx==0.13.3->googletrans) (2020.11.8)
Requirement already satisfied: chardet==3.* in /usr/local/lib/python3.6/dist-packages (from httpx==0.13.3->googletrans) (3.0.4)
Requirement already satisfied: hstspreload in /usr/local/lib/python3.6/dist-packages (from httpx==0.13.3->googletrans) (2020.11.21)
Requirement already satisfied: rfc3986<2,>=1.3 in /usr/local/lib/python3.6/dist-packages (from httpx==0.13.3->googletrans) (1.4.0)
Requirement already satisfied: httpcore==0.9.* in /usr/local/lib/python3.6/dist-packages (from httpx==0.13.3->googletrans) (0.9.1)
Requirement already satisfied: sniffio in /usr/local/lib/python3.6/dist-packages (from httpx==0.13.3->googletrans) (1.2.0)
Requirement already satisfied: h2==3.* in /usr/local/lib/python3.6/dist-packages (from httpcore==0.9.*->httpx==0.13.3->googletrans) (3.2.0)
Requirement already satisfied: h11<0.10,>=0.8 in /usr/local/lib/python3.6/dist-packages (from httpcore==0.9.*->httpx==0.13.3->googletrans) (0.9.0)
Requirement already satisfied: contextvars>=2.1; python_version < "3.7" in /usr/local/lib/python3.6/dist-packages (from sniffio->httpx==0.13.3->googletrans) (2.4)
Requirement already satisfied: hpack<4,>=3.0 in /usr/local/lib/python3.6/dist-packages (from h2==3.*->httpcore==0.9.*->httpx==0.13.3->googletrans) (3.0.0)
Requirement already satisfied: hyperframe<6,>=5.2.0 in /usr/local/lib/python3.6/dist-packages (from h2==3.*->httpcore==0.9.*->httpx==0.13.3->googletrans) (5.2.0)
Requirement already satisfied: immutables>=0.9 in /usr/local/lib/python3.6/dist-packages (from contextvars>=2.1; python_version < "3.7"->sniffio->httpx==0.13.3->googletrans) (0.14)
!pip install jieba
Requirement already satisfied: jieba in /usr/local/lib/python3.6/dist-packages (0.42.1)
!pip install snownlp
Requirement already satisfied: snownlp in /usr/local/lib/python3.6/dist-packages (0.12.3)
!pip install pyspark
Requirement already satisfied: pyspark in /usr/local/lib/python3.6/dist-packages (3.0.1)
Requirement already satisfied: py4j==0.10.9 in /usr/local/lib/python3.6/dist-packages (from pyspark) (0.10.9)
!git clone https://github.com/goto456/stopwords.git
fatal: destination path 'stopwords' already exists and is not an empty directory.
Read stop words¶
stop_words = []
with open('/content/stopwords/hit_stopwords.txt') as f:
lines = f.readlines()
for line in lines:
stop_words.append(line.replace('\n', ''))
print(stop_words[:10])
['———', '》),', ')÷(1-', '”,', ')、', '=(', ':', '→', '℃ ', '&']
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import Row
# NLP Module
from snownlp import SnowNLP
import jieba
# translation
from googletrans import Translator
import matplotlib.pyplot as plt
import pandas as pd
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
import urllib.request
file_url = "https://github.com/sirily11/hot-keywords/releases/download/master/Sina_keywords--12.03.2020.csv"
file_name = "Sina_keyword.csv"
urllib.request.urlretrieve(file_url, file_name)
Read csv file¶
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField('keyword', StringType(), False),
StructField("content", StringType(), False),
StructField("time", TimestampType(), False),
StructField("rank", IntegerType(), False),
StructField("numbers", IntegerType(), False),
])
df = spark.read.csv(file_name, schema=schema, lineSep=";")
df.show()
rdd = df.rdd
rdd.take(10)
from time import sleep
Translation¶
data = pd.read_csv("Sina_keyword.csv", error_bad_lines=False, names=['Keyword', 'Content', 'Time', 'Rank', 'Number'])
data = data[:100]
from time import sleep
from tqdm.auto import tqdm
tqdm.pandas()
translator = Translator()
translator.translate("你好").text
def translate(index: str, content: str):
has_translated = False
while not has_translated:
try:
return translator.translate(content)
except Exception:
sleep(1)
continue
data['Content_Translation'] = data.progress_apply(lambda row: translate(row.name, row['Content']), axis=1)
data.head(10)
preprocess data¶
def preprocess(row):
global stop_words
d = row.asDict()
if d['content']:
d['content'] = d['content'].replace("展开全文c", "")
d['content'] = d['content'].replace("收起全文d", "")
d['content'] = d['content'].replace(f"{d['keyword']}", "")
d['content'] = d['content'].replace("#", "")
d['content'] = d['content'].replace("\n", "")
d['content'] = d['content'].replace('O网页链接', '')
for word in stop_words:
d['content'] = d['content'].replace(word, '')
new_row = Row(**d)
return new_row
new_data = rdd.map(preprocess)
print(new_data.take(10))
new_data.cache()
jieba.initialize()
tokenizer = jieba.Tokenizer()
def sentiment(row):
try:
s = SnowNLP(row.content)
return (row.keyword, (s.sentiments, row.content))
except Exception:
return ("error", "nil")
def keyword(row):
return jieba.lcut(row.keyword, cut_all=False), row
sentiments = new_data.map(sentiment).filter(lambda x: x[0] != "error" and x[1] != "nil")
sentiments.cache().take(10)
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-29-7e85580c7ffe> in <module>()
13
14 sentiments = new_data.map(sentiment).filter(lambda x: x[0] != "error" and x[1] != "nil")
---> 15 sentiments.cache().take(10)
/usr/local/lib/python3.6/dist-packages/pyspark/rdd.py in take(self, num)
1444
1445 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1446 res = self.context.runJob(self, takeUpToNumLeft, p)
1447
1448 items += res
/usr/local/lib/python3.6/dist-packages/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
1116 # SparkContext#runJob.
1117 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1118 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1119 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
1120
/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1301 proto.END_COMMAND_PART
1302
-> 1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py in send_command(self, command, retry, binary)
1031 connection = self._get_connection()
1032 try:
-> 1033 response = connection.send_command(command)
1034 if binary:
1035 return response, self._create_connection_guard(connection)
/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py in send_command(self, command)
1198
1199 try:
-> 1200 answer = smart_decode(self.stream.readline()[:-1])
1201 logger.debug("Answer received: {0}".format(answer))
1202 if answer.startswith(proto.RETURN_MESSAGE):
/usr/lib/python3.6/socket.py in readinto(self, b)
584 while True:
585 try:
--> 586 return self._sock.recv_into(b)
587 except timeout:
588 self._timeout_occurred = True
KeyboardInterrupt:
write sentiments to local
sentiments.
def groupValues(values):
total = 0
i = 0
for v in values:
sentiments, content = v
total += sentiments
i += 1
return total / i
grouped = sentiments.groupByKey().mapValues(groupValues)
print(grouped.take(10))
pos = sentiments.filter(lambda x: x[1][0] > 0.75).count()
neg = sentiments.filter(lambda x: x[1][0] <= 0.35).count()
net = sentiments.filter(lambda x: x[1][0] > 0.35 and x[1][0] <= 0.75).count()
plt.figure(figsize=(10,10))
plt.bar("Positive", pos)
plt.bar("Neutral", net)
plt.bar("Negative", neg)
plt.xlabel("Sentiment")
plt.ylabel("Number of posts")
plt.title("Sina Weibo Sentiment")
plt.savefig("sina.png")