SoFunction
Updated on 2024-10-30

python3 implementation of user-based collaborative filtering

In this article, the example for you to share the python3 to realize the specific code based on the user collaborative filtering, for your reference, the specific content is as follows

Without further ado, let's get right to the code.

#!/usr/bin/python3 
# -*- coding: utf-8 -*- 
#20170916 Synergistic Filtering Movie Recommendations Base Draft
#Dictionary and other format data processing and direct file writing
 
 
##from numpy import * 
import time 
from math import sqrt 
##from texttable import Texttable 
 
 
class CF: 
 
 def __init__(self, movies, ratings, k=5, n=20): 
   = movies#[MovieID,Title,Genres] 
  (self.train_data,self.test_data) = (ratings[0], ratings[1])#[UserID::MovieID::Rating::Timestamp] 
  # of neighbors
   = k 
  # of recommendations
   = n 
  # User ratings for the movie
  # Data format {'UserID UserID':[(MovieID Movie ID, Rating user's star rating for movie)]}
   = {} 
  # of users who rated a movie
  # Data format: {'MovieID Movie ID':[UserID,UserID]}
  # {'1',[1,2,3..],...} 
   = {} 
  # Neighborhood information
   = [] 
  # Recommended List
   = []# Contains dist and movie id
   = [] # Intersection of the training set test set and only movie ids
  #User Reviewed Movie Information
  self.train_user = [] 
  self.test_user = [] 
  # A list of recommendations for the user, with movieid only
  self.train_rec =[] 
  self.test_rec = [] 
  The set of movie rating prediction data in #test.
   = {}# The set of ratings for the first k nearest neighbors
   = {}# final weighted average set of ratings {"movie id": predicted rating}
  # Recall and accuracy
   = [0.0,0.0] 
   = [0.0, 0.0] 
 '''''
 The userDict data format:
 '3': [('3421', 0.8), ('1641', 0.4), ('648', 0.6), ('1394', 0.8), ('3534', 0.6), ('104', 0.8), 
 ('2735', 0.8), ('1210', 0.8), ('1431', 0.6), ('3868', 0.6), ('1079', 1.0), ('2997', 0.6), 
 ('1615', 1.0), ('1291', 0.8), ('1259', 1.0), ('653', 0.8), ('2167', 1.0), ('1580', 0.6), 
 ('3619', 0.4), ('260', 1.0), ('2858', 0.8), ('3114', 0.6), ('1049', 0.8), ('1261', 0.2), 
 ('552', 0.8), ('480', 0.8), ('1265', 0.4), ('1266', 1.0), ('733', 1.0), ('1196', 0.8), 
 ('590', 0.8), ('2355', 1.0), ('1197', 1.0), ('1198', 1.0), ('1378', 1.0), ('593', 0.6), 
 ('1379', 0.8), ('3552', 1.0), ('1304', 1.0), ('1270', 0.6), ('2470', 0.8), ('3168', 0.8), 
 ('2617', 0.4), ('1961', 0.8), ('3671', 1.0), ('2006', 0.8), ('2871', 0.8), ('2115', 0.8), 
 ('1968', 0.8), ('1136', 1.0), ('2081', 0.8)]} 
 ItemUserdata format: 
 {'42': ['8'], '2746': ['10'], '2797': ['1'], '2987': ['5'], '1653': ['5', '8', '9'], 
 '194': ['5'], '3500': ['8', '10'], '3753': ['6', '7'], '1610': ['2', '5', '7'], 
 '1022': ['1', '10'], '1244': ['2'], '25': ['8', '9'] 
 ''' 
  
# Convert ratings to userDict and ItemUser
 def formatRate(self,train_or_test): 
   = {} 
   = {} 
  for i in train_or_test:#[UserID,MovieID,Rating,Timestamp] 
   # Score up to 5 Divide by 5 to normalize data
##   temp = (i[1], float(i[2]) / 5) 
   temp = (i[1], float(i[2])) 
##   temp = (i[1], i[2]) 
   # Calculate userDict {'userid':[(movie id,rating),(2,5)...] ,'2':[...] ...} A collection of viewer ratings for each movie
   if(i[0] in ): 
    [i[0]].append(temp) 
   else: 
    [i[0]] = [temp] 
   # Calculate ItemUser {'movie id',[user id...] ,...} Collection of viewers for the same movie
   if(i[1] in ): 
    [i[1]].append(i[0]) 
   else: 
    [i[1]] = [i[0]]   
 
 # Format userDict data
 def formatuserDict(self, userId, p):#userID is the target to be queried, p is the nearest neighbor object
  user = {} 
  #user data format is: movie id: [rating of userID, ratings of nearest neighboring users]
  for i in [userId]:#i is the same 81 rows for each parenthesis in the userDict data
   user[i[0]] = [i[1], 0] 
  for j in [p]: 
   if(j[0] not in user): 
    user[j[0]] = [0, j[1]]#Indicates that the target and near-neighbor users did not rate a movie at the same time
   else: 
    user[j[0]][1] = j[1]#Suggests that both have ratings for the same movie #
  return user 
  
   
 
 # Calculate the cosine distance
 def getCost(self, userId, p): 
  # Get the concatenation of user userId and p-rated movie
  # {'movieId': [rating of userId, rating of p]} no rating is 0
  user = (userId, p) 
  x = 0.0 
  y = 0.0 
  z = 0.0 
  for k, v in ():#k is the key and v is the value
   x += float(v[0]) * float(v[0]) 
   y += float(v[1]) * float(v[1]) 
   z += float(v[0]) * float(v[1]) 
  if(z == 0.0): 
   return 0 
  return z / sqrt(x * y) 
 #Calculate Pearson's similarity
##  def getCost(self, userId, p): 
## # Get the concatenation of user userId and l rated movie
## # {'Movie ID': [userId's rating, l's rating]} no rating is 0
##   user = (userId, p) 
##   sumxsq = 0.0 
##   sumysq = 0.0 
##   sumxy = 0.0 
##   sumx = 0.0 
##   sumy = 0.0 
##   n = len(user) 
##   for k, v in (): 
##    sumx +=float(v[0]) 
##    sumy +=float(v[1]) 
##    sumxsq += float(v[0]) * float(v[0]) 
##    sumysq += float(v[1]) * float(v[1]) 
##    sumxy += float(v[0]) * float(v[1]) 
##   up = sumxy -sumx*sumy/n 
##   down = sqrt((sumxsq - pow(sumxsq,2)/n)*(sumysq - pow(sumysq,2)/n)) 
##   if(down == 0.0): 
##    return 0 
##   return up/down 
 
# Find neighboring users of a user
 def getNearestNeighbor(self, userId): 
  neighbors = [] 
   = [] 
  # Get userId ratings for movies rated by users who have also rated the movie.
  for i in [userId]:#ibecause ofuserDictEach bracket in the data is the same as95classifier for objects in rows such as words#user data format is: movie id: [rating of userID, ratings of nearest neighboring users]
   for j in [i[0]]:#i[0] is the movie number and j is each user watching the same movie
    if(j != userId and j not in neighbors): 
     (j) 
  # Calculate the similarity between these users and userId and sort them
  for i in neighbors:#i is the user id
   dist = (userId, i) 
   ([dist, i]) 
  # Sorting defaults to ascending, reverse=True means descending
  (reverse=True) 
   = [:]#Slice operation, take the first k
##  print('neighbors',len(neighbors)) 
 
  # Get a list of recommendations
 def getrecommandList(self, userId): 
   = [] 
  # Create a recommendation dictionary
  recommandDict = {} 
  for neighbor in :#Here the neighbor data is formatted as [[dist, user id], [],....]
   movies = [neighbor[1]]#movies data format is [(movie id, rating), (), 。。。。]
   for movie in movies: 
    if(movie[0] in recommandDict): 
     recommandDict[movie[0]] += neighbor[0]####???? 
    else: 
     recommandDict[movie[0]] = neighbor[0] 
 
  # Create referral lists
  for key in recommandDict:#recommandDict data format {movie id: cumulative dist,... }
   ([recommandDict[key], key])#recommandList data format [[cumulative dist, movie id], [], 。。。。]
  (reverse=True) 
##  print(len()) 
   = [:] 
##  print(len()) 
 # Accuracy of recommendations
 def getPrecision(self, userId): 
## print("Start!!!")
# compute test_data first, so that what is ultimately retained etc. is the data from the later computation of train_data (without swapping positions you would have to add parameters to the gR function to retain the respective neighbor)
  (self.test_user,self.test_rec) = (self.test_data,userId)#A list of movies rated by user userId in the test set and a list of movies recommended to that user.
  (self.train_user,self.train_rec) = (self.train_data,userId)# The set of all movies rated by the training set's user userId (self.train_user) and the list of movies recommended to that user (self.train_rec)
#Haipeng Zhang of Xi'an University of Electricity: construction of a movie recommendation system based on collaborative filtering (2015) in accuracy recall calculation
  for i in self.test_rec: 
   if i in self.train_rec: 
    (i) 
  [0] = len()/len(self.train_rec) 
  [0] = len()/len(self.test_rec) 
  #Yu Huang, Beijing Jiaotong University: quasi, recall computing in collaborative filtering-based recommender system design and implementation (2015)
   = []#The following calculates the initial recommand not to be null if it is not zeroed here.
  for i in self.train_rec: 
   if i in self.test_user: 
    (i) 
  [1] = len()/len(self.train_rec) 
  [1] = len()/len(self.test_user) 
##  print(self.train_rec,self.test_rec,"20",len(self.train_rec),len(self.train_rec)) 
  # Process the same user through the training set and test set separately
 def getRecommand(self,train_or_test,userId): 
  (train_or_test) 
  (userId) 
  (userId) 
  user = [i[0] for i in [userId]]#A collection of all movies rated by userId
  recommand = [i[1] for i in ]#recommendList is a collection of movie ids only, as opposed to recommandList (which also contains dist)
## print("userid The user has been processed through the training set test set")
  return (user,recommand) 
 #Rating predictions for movies on TEST
 def foreCast(self): 
   = {}# ????? After the initialization of the previous variable uniform definition, is this initialization required within the function ????
  same_movie_id = [] 
  neighbors_id = [i[1] for i in ] # set of nearest neighbor user data containing only user ids
     
  for i in self.test_user:#i is the movie id, i.e., i in test has been recommended to the
   if i in self.train_rec: 
    same_movie_id.append(i) 
    for j in [i]:#j is the user id, i.e., find the ratings and similarities of the nearest neighbor users
     if j in neighbors_id: 
      user = [i[0] for i in [j]]#[userId] data format: data format is [(movie id, rating), (), 。。。。] ; here userid should be the nearest neighbor user p
      a = [neighbors_id.index(j)]# Find the data for this nearest neighbor user [dist, user id]
      b = [j][(i)]#Find the data for this near-neighboring user [movie id, user id]
      c = [a[0], b[1], a[1]] 
      if (i in ): 
       [i].append(c) 
      else: 
       [i] = [c]#Data format: dictionary {"movie id": [dist, rating, user id] []}{'589': [[0.22655856915174025, 0.6, '419'], [0.36264561173211646, 1.0, '1349']. ...}
##  print(same_movie_id) 
  #Predicted ratings are calculated as a weighted average of the ratings of each near-neighbor user.
   = {} 
  if same_movie_id :#If the movie in test is in the recommended list, if it is empty without judgment, the following processing will report an error
   for movieid in same_movie_id: 
    total_d = 0 
    total_down = 0 
    for d in [movieid]:# At this point d is already the innermost list []; data format for [movieid] [[]]
     total_d += d[0]*d[1] 
     total_down += d[0] 
    [movieid] = [round(total_d/total_down,3)]# Accuracy to 3 decimals after weighted average
   #The id of the movie that is in test but not in the recommendation is counted as zero here.
   for i in self.test_user: 
    if i not in movieid: 
     [i] = [0] 
  else: 
   for i in self.test_user: 
    [i] = [0] 
##  return  
 # Calculate the mean absolute error MAE
 def cal_Mae(self,userId): 
  (self.test_data) 
##  print() 
  for item in [userId]: 
   if item[0] in : 
    [item[0]].append(item[1])# Data format [[Predicted score, actual score]]
## ## Transition code
##  for i in : 
##   pass 
  return  
    # User-based recommendations
 # Calculate similarity between users based on ratings of movies
## def recommendByUser(self, userId): 
## print("Dear, please wait a moment, the system is working fast for you.") ## HCI assisted interpretation.
##  (self,userId) 
 
 
# Getting data
def readFile(filename): 
 files = open(filename, "r", encoding = "utf-8") 
 data = [] 
 for line in (): 
  item = ().split("::") 
  (item) 
 return data 
 () 
def load_dict_from_file(filepath): 
 _dict = {} 
 try: 
  with open(filepath, 'r',encoding = "utf -8") as dict_file: 
   for line in dict_file.readlines(): 
    (key, value) = ().split(':') 
    _dict[key] = value 
 except IOError as ioerr: 
  print ("File %s does not exist." % (filepath)) 
 return _dict 
def save_dict_to_file(_dict, filepath): 
 try: 
  with open(filepath, 'w',encoding = "utf - 8") as dict_file: 
   for (key,value) in _dict.items(): 
    dict_file.write('%s:%s\n' % (key, value)) 
 
 except IOError as ioerr: 
  print ("File %s could not be created." % (filepath)) 
def writeFile(data,filename): 
 with open(filename, 'w', encoding = "utf-8")as f: 
  (data) 
 
 
# ------------------------- start -------------------------------
 
def start3(): 
 start1 = () 
 movies = readFile("D:/d/") 
 ratings = [readFile("D:/d/"),readFile("D:/d/")] 
 demo = CF(movies, ratings, k=20) 
 userId = '1000' 
 (userId) 
## print(()) 
 () 
 print(demo.cal_Mae(userId)) 
## (ID) ## The previous sentence can only realize the fixed user query, this sentence can realize the "want to check which check which", later you can add a loop, one by one check, check until you do not want to check
 print("Data processed is %d entries." % (len(ratings[0])+len(ratings[1]))) 
## print("____---",len(ratings[0]),len(ratings[1])) 
## print("Accuracy: %.2f %%" % ( * 100))
## print("Recall: %.2f %%" % ( * 100))
 print() 
 print() 
 end1 = () 
 print("Elapsed time: %f s" % (end1 - start1)) 
def start1(): 
 start1 = () 
 movies = readFile("D:/d/") 
 ratings = [readFile("D:/d/"),readFile("D:/d/")] 
 demo = CF(movies, ratings, k = 20) 
 (ratings[0]) 
 writeFile(str(),"D:/d/dd/") 
 writeFile(str(), "D:/d/dd/") 
## save_dict_to_file(,"D:/d/dd/") 
## save_dict_to_file(,"D:/d/dd/") 
 print("Processing complete.") 
## with open("D:/d/dd/",'r',encoding = 'utf-8') as f: 
##  diction = () 
##  i = 0 
##  for j in eval(diction): 
##   print(j) 
##   i += 1 
##   if i == 4: 
##    break 
def start2(): 
 start1 = () 
 movies = readFile("D:/d/") 
 ratings = [readFile("D:/d/"),readFile("D:/d/")] 
 demo = CF(movies, ratings, k = 20) 
 demo.formatRate_toMovie(ratings[0]) 
 writeFile(str(),"D:/d/dd/") 
## writeFile(str(),"D:/d/dd/") 
## writeFile(str(), "D:/d/dd/") 
## save_dict_to_file(,"D:/d/dd/") 
## save_dict_to_file(,"D:/d/dd/") 
 print("Processing complete.")  
 
if __name__ == '__main__': 
 start1() 

This is the whole content of this article.