Rock's Saying Logo1 border Rock's Saying Logo2
隨手行文 | 閒語集 | 過客留言 | 飛鴿傳書

作者: rock (遊手好閒的石頭成)
E-Mail: shirock@educities.edu.tw

unix 多工程式設計技術 -

如何以 semaphore 進行資源的共用互斥鎖定。

注意: 本文中所摘錄的程式,屬於 Firebird BBS, The TIP Project 的一部份,
使用時必須依循 GNU GPL 。

在 unix 系統中,資源的共用互斥,通常以檔案的鎖定最為常見,也最為 容易,只要使用 flock() 即可對檔案進行 LOCK_SH (共享鎖定)、 LOCK_EX (互斥鎖定),但是其他的資源就沒怎麼好了,例如共享記憶體,本身完全 沒有提供鎖定功能,此時就必須借助 semaphore 了。

但是 semaphore 只有三個函數: semget(), semctl(), semop() ,說明又 少少的,該如何使用呢?

本文即在說明,如何以 semaphore 實作一個如 flock() 具有 LOCK_SH, LOCK_EX, LOCK_UN, LOCK_NB 四種參數的「資源共用互斥鎖定函數」。

參考書籍:

新洞悉UNIX系列叢書 - 系統程式設計篇, 劉祖亮著,和碩科技出版
第 414 到 419 頁∼∼∼只有五頁就可以搞死一票人了∼∼∼。

參考文件:

你也可以用 manpage 查詢。 還有,文件雖然是 FreeBSD 的,但 並不表示只有 FreeBSD 才支援這三個函式,至少在 Solaris 及 Linux 中也都有。

說明文件夾在程式中。

sample
上圖是假設有 1 、 2 、 3 三個 process 打算上鎖的四個階段。
其中被紅色方框圍住的動作,表示被擱置了。

  1. process 1 想加上共享鎖(SHLOCK)。
    1. process 1 先鎖住 EXLOCK ,再加上 SHLOCK 。
    2. process 2 想鎖住 EXLOCK ,但被擱置。
    3. process 3 想鎖住 EXLOCK ,但被擱置。
  2. process 1 完成上鎖動作。
    1. process 1 加上 SHLOCK 後,解開 EXLOCK ,完成上鎖動作。
    2. process 2 鎖住 EXLOCK ,但被擱置,等 process 1 之前加上的 SHLOCK 解開。
    3. process 3 依然被擱置在等待鎖住 EXLOCK 的地方。
  3. process 1 解鎖,process 2 結束等待,完成上鎖動作。
    1. process 1 解開 SHLOCK 。
    2. process 2 已無 SHLOCK , process 2 結束擱置狀態,完成上鎖動作。
    3. process 3 依然被擱置在等待鎖住 EXLOCK 的地方。
  4. process 2 解鎖,process 3 結束等待,完成上鎖。
    1. process 2 解開 EXLOCK 。
    2. process 3 鎖住 EXLOCK ,再加上 SHLOCK ,最後解開 EXLOCK ,完成上鎖動作。
擱置動作是系統視情形自動在做的。

程式是從TIP BBS library 2 中的 bcache.c 中擷取出來的, bcache.c 是一個共享記憶體使用模組,並利用 semaphore 來實作其中的 lock_shm() 功能。

程式屬於 Firebird BBS, The TIP Project 的一部份。


/* Firebird BBS, The TIP Project. 資料快取模組: bcache.c */ 
#include <stdlib.h>  
#include <sys/stat.h> 
#include <unistd.h> 
#include <sys/file.h> 
#include <sys/types.h>
#include <sys/ipc.h> 
#include "bbsdefs.h" 
#include "config.h" 
#include "struct.h"  
/*NAME: lock_shm() write by rock 鎖定共享記憶體。
op: 
  LOCK_SH 加上共享鎖定(讀取鎖定) 
  LOCK_EX 加上互斥鎖定(寫入鎖定)
  LOCK_NB 不擱置( non-blocking ,配合前兩者使用 )
  LOCK_UN 解除鎖定 此函數提供了一個近似 flock() 行為的 shm 鎖定函數。
  lock_shm() 並不是直接對 shm 鎖定,事實上 unix 並沒有鎖定 shm 的
  動作,所以是用 sem 的互斥動作,來進行對 shm 的鎖定。  換言之, 
  lock_shm() 不只是個「共享記憶體鎖」,更是一個「共用互斥 資源鎖」。
  「共用互斥」英文為: Mutual exclusion - 即可共用、也可排他。  
  可以很多人一起使用,也可以只給自已用,而排除他人的使用權利。 
  實作方式: 因為同時要應付 LOCK_EX 及 LOCK_SH 兩種情形,故需要兩
  個 sem 為一組 來控制,一個 sem 控制 EXLOCK (exclusion lock) ,另一
  個 sem 控制 SHLOCK (shared lock)。 因為  EXLOCK 的特性是,一次只能
  用一個,故此 sem 的初值設為 1 ,而 SHLOCK 則是可以同時使用好幾個,
  故此 sem 的初值設為 0 。 還有一點要注意的是,  EXLOCK 及 SHLOCK 不
  會同時存在,當有 SHLOCK 被鎖上時,想鎖上 EXLOCK 的動作,就必須等待
  (被擱置)到 SHLOCK 全部 都已解除了,而後來的鎖上  SHLOCK 的動作,又
  必須等待前一個排隊中的 EXLOCK 動作完成及解除後,才可以鎖上。 亦即:
  後來的 EXLOCK 必須等前面的 SHLOCK 全部解除,後來的  SHLOCK 必須等
  前面的 EXLOCK 解除。 
  如何鎖上 SHLOCK: 
  1.鎖上 EXLOCK: ( EXLOCK.sem_op == -1 ) 等待先前的  EXLOCK 解除,及
    防止後來的上鎖動作插隊。 當 EXLOCK = 0 時,試圖將其 -1 的動作,
    將會被系統擱置,直到 EXLOCK 的值大於等於 -1  的絕對值。 
  2.鎖上 SHLOCK: ( SHLOCK.sem_op == +1 ) 因為 SHLOCK 可以同時有好幾
    個,所以上鎖動作是用加的。 SHLOCK  的值,會等於目前取得 SHLOCK 
    的程序的數目。 
  3.解除 EXLOCK: ( EXLOCK.sem_op == +1 ) 讓後來的上鎖動作可以繼續動作。  
    但是後到的 LOCK_EX 將會等待 SHLOCK 的值變為 0 ,而在它等待的 時間
    裡,後到的 LOCK_SH 將會等待它解除 EXLOCK 的鎖定。 
  如何鎖上  EXLOCK: 
  1.鎖上 EXLOCK: ( EXLOCK.sem_op == -1 ) 如果 EXLOCK = 1 ,則 -1 將會
    被系統加到 EXLOCK  中, EXLOCK 變為 0 ( 1+ -1 = 0 )。 如果 
    EXLOCK = 0 ,則試圖 -1 的動作,將會被擱置。 
  2.等待 SHLOCK 全部解除:  ( SHLOCK.sem_op == 0 ) 等待 SHLOCK 的值變
    為 0 ,亦即所有的 SHLOCK 都已解除。 第一步都先鎖上 EXLOCK ,就是
    為了防止後來的上鎖動作插隊。  
  如何解除鎖定: 這講起來比較麻煩,我就懶得寫了,看個人經驗吧。 
  1.if ( EXLOCK =1 and SHLOCK > 0 ) then SHLOCK  -1 (解除 SHLOCK) 
  2.if ( EXLOCK =0 and has process wait SHLOCK be zero) then 
    SHLOCK  -1 (解除 SHLOCK) 
  3.if ( EXLOCK =0 but no process wait SHLOCK be zero) then 
    EXLOCK  +1 (解除 EXLOCK) 
  BUG: 因為實作上的技術問題,一個設計不良的 LOCK_UN 動作,可能會 
  破壞掉目前正在等待取得鎖定的程序隊伍。 例如:  process a 尚未取
  得任何 LOCK_EX 或 LOCK_SH ,卻要求 LOCK_UN ,此時會誤將 process b 
  所取得的鎖定給解除了。 如果此時有一  process c 正等待 process b 解
  除鎖定,就會誤以為 process b 已經解除鎖定(實際上是被 process a 越
  過界給解除了),開始存取資源,造成 process b 和 process c 互搶資源。 

  RC: 成功(0)、失敗(-1) See also: semget(), semctl(),  semop() 
*/
int lock_shm(const char*shmname,int op) {
#ifndef OS2EMX
  enum { EXLOCK,  SHLOCK} lock;
  struct sembuf lockop={0, 0, SEM_UNDO} /* sem 操作指令*/;
  int semkey,  semid;
  ushort sem_val[] = {1 /*Init value of EXLOCK*/, 0 /*Init value of SHLOCK*/};
  if( (semkey=IPC_key(shmname,'B')) < 0) return -1; //嘗試建立 sem 
  if((semid=semget(semkey,  2, IPC_CREAT|IPC_EXCL|0640)) >= 0) { 
    if( semctl(semid, 0, SETALL, sem_val) < 0)	 // 初始 sem 的值 
      return -1; 
  } else { 
    if((semid=semget(semkey,1,0)) < 0) //sem 可能建立了,取得semid  
      return -1; 
  } 
/*{ 
ushort sval[2]; 
semctl(semid,0,GETALL,sval); 
printf("[%d] Value  of EXLOCK:%d, Value of SHLOCK:%d\n",semid, sval[EXLOCK],sval[SHLOCK]); 
}*/ 
/*  non-blocking 鎖定,只要多設一個旗標 IPC_NOWAIT 即達目的。 
連傳回值也一樣:當無法立即取得對 sem 的操作時, semop() 不會被擱置,  
會立即返回(傳回 -1 ),並設定 errno 為 EAGAIN 。 */ 
if( op & LOCK_NB ) 
  lockop.sem_flg |= IPC_NOWAIT;  
if( op & LOCK_EX ) { 
  lockop.sem_num = EXLOCK; 
  lockop.sem_op = -1; 
  if( semop(semid,  &lockop, 1) < 0 ) 
    return -1; 
  //printf("EXLOCK... wait SHLOCK be zero\n"); 
  lockop.sem_num  = SHLOCK; 
  lockop.sem_op = 0; 
  return semop(semid, &lockop, 1); 
} else if( op &  LOCK_SH ) { 
  lockop.sem_num = EXLOCK; 
  lockop.sem_op = -1; 
  if( semop(semid, &lockop,  1) < 0 ) 
    return -1; 
  //printf("EXLOCK... wait to get SHLOCK\n"); 
  lockop.sem_num  = SHLOCK; 
  lockop.sem_op = 1; 
  if( semop(semid, &lockop, 1) < 0 ) 
    return -1;
  //printf("SHLOCK  ok\n"); 
  lockop.sem_num = EXLOCK; 
  lockop.sem_op = 1; 
  //printf("Release EXLOCK\n");  
  return semop(semid, &lockop, 1); 
} 
//以下動作皆為 LOCK_UN 
if( semctl(semid, 0, GETALL,  sem_val) < 0 ) 
  return -1; 
//printf("LOCK_UN, EXLOCK:%d, SHLOCK:%d\n",sem_val[0],  sem_val[1]); 
/* 如果沒有程序在等待取得鎖定的話,那解除鎖定的動作,將會是"刪
除 sem ",而不是"改變 sem 的值"。 如果有程序等待取得  LOCK_EX 的
話,那就是在等待 SHLOCK 的值變 為 0 。 如果有程序等待取得 
LOCK_SH 的話,那就是在等待 EXLOCK 的值增 加。 */  
  if( semctl(semid, SHLOCK, GETZCNT, 0) == 0 && semctl(semid, EXLOCK, GETNCNT, 0)  == 0 ) { 
  //printf("may remove..."); 
    if( sem_val[EXLOCK] <= 1 && sem_val[SHLOCK]  == 0) { 
    /*printf("remove sem %d,Z:%d,N:%d\n", semid, 
      semctl(semid,SHLOCK,GETZCNT,0),
      semctl(semid,EXLOCK,GETNCNT,0));  
    */ 
    return semctl(semid, 0, IPC_RMID, 0); 
    } 
} 
if( sem_val[EXLOCK] >0 && sem_val[SHLOCK]  > 0 ) {
  lockop.sem_num = SHLOCK; 
  lockop.sem_op = -1; 
  //printf("EXLOCK >0 and SHLOCK  >0: UNLOCK SHLOCK\n"); 
  return semop(semid, &lockop, 1); 
} else if( sem_val[EXLOCK]  <= 0 ) { 
  if( semctl(semid, SHLOCK, GETZCNT, NULL) >0 ) { 
    lockop.sem_num = SHLOCK;  
	lockop.sem_op = -1; 
	//printf("UNLOCK SHLOCK\n"); 
  } else { 
    lockop.sem_num = EXLOCK;  
	lockop.sem_op = 1; 
	//printf("UNLOCK EXLOCK\n"); 
  } 
  return semop(semid, &lockop,  1); 
} 
return 0; 
#else 
return 0; /* 尚未實作 */ 
#endif } 

以下是測試程式: testlock.c

可以下指令:

#testlock a & testlock b & testlock c&
run...
#testlock d& testlock e&

先跑幾隻丟到背景,等執行一部份後,再跑幾隻到背景,就可 以觀察數隻程式利用 semaphore 協調執行順序的情形了。


#include <stdio.h> 
#include <stdlib.h> 
#include <sys/types.h> 
#include <unistd.h>  
#include "bcache.h" 
void dump(char *prog) {
  char buf[80]; 
  sprintf(buf,"dump:[%s].",prog);  
  write(1, buf, strlen(buf)); 
  sleep(1); 
} 
void main(int argc, char* argv[]) { 
  char  shmname[] = "TEST_BCACHE"; 
  int i; 
  char *prog; 
  prog= argv[1]; 
  printf("[%s]LOCK_EX:%d\n",  prog, lock_shm(shmname,LOCK_EX) ); 
  for(i = 0; i < 5; i++) 
    dump(prog); 
  printf("\n[%s]LOCK_UN:%d\n",  prog, lock_shm(shmname,LOCK_UN) ); 
  printf("[%s]LOCK_SH:%d\n", prog, lock_shm(shmname,LOCK_SH)  ); 
  for(i =0; i <5; i++) 
    dump(prog); 
  printf("\n[%s]LOCK_UN:%d\n", prog, lock_shm(shmname,LOCK_UN)  ); 
  printf("[%s]LOCK_EX:%d\n", prog, lock_shm(shmname,LOCK_EX) ); 
  for(i =0; i <5;  i++) 
    dump(prog); 
  printf("\n[%s]LOCK_UN:%d\n", prog, lock_shm(shmname,LOCK_UN)  ); 
} 


隨手行文 | 閒語集 | 過客留言 | 飛鴿傳書 石頭閒語 (http://home.educities.edu.tw/shirock/)
Copyright (C) 1999 - 2002 遊手好閒的石頭成 更新日期: 1999年10月1日