My Project
rpl_rli_pdb.h
00001 /* Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved.
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
00015 
00016 #ifndef RPL_RLI_PDB_H
00017 
00018 #define RPL_RLI_PDB_H
00019 
00020 #ifdef HAVE_REPLICATION
00021 
00022 #include "sql_string.h"
00023 #include "rpl_rli.h"
00024 #include <my_sys.h>
00025 #include <my_bitmap.h>
00026 #include "rpl_slave.h"
00027 
00039 /* Assigned Partition Hash (APH) entry */
00040 typedef struct st_db_worker_hash_entry
00041 {
00042   uint  db_len;
00043   const char *db;
00044   Slave_worker *worker;
00045   /*
00046     The number of transaction pending on this database.
00047     This should only be modified under the lock slave_worker_hash_lock.
00048    */
00049   long usage;
00050   /*
00051     The list of temp tables belonging to @ db database is
00052     attached to an assigned @c worker to become its thd->temporary_tables.
00053     The list is updated with every ddl incl CREATE, DROP.
00054     It is removed from the entry and merged to the coordinator's
00055     thd->temporary_tables in case of events: slave stops, APH oversize.
00056   */
00057   TABLE* volatile temporary_tables;
00058 
00059   /* todo: relax concurrency to mimic record-level locking.
00060      That is to augmenting the entry with mutex/cond pair
00061      pthread_mutex_t
00062      pthread_cond_t
00063      timestamp updated_at; */
00064 
00065 } db_worker_hash_entry;
00066 
00067 bool init_hash_workers(ulong slave_parallel_workers);
00068 void destroy_hash_workers(Relay_log_info*);
00069 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
00070                                db_worker_hash_entry **ptr_entry,
00071                                bool need_temp_tables, Slave_worker *w);
00072 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
00073 int wait_for_workers_to_finish(Relay_log_info const *rli,
00074                                Slave_worker *ignore= NULL);
00075 
00076 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
00077 
00078 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
00079 
00080 typedef struct slave_job_item
00081 {
00082   void *data;
00083 } Slave_job_item;
00084 
00092 class circular_buffer_queue
00093 {
00094 public:
00095 
00096   DYNAMIC_ARRAY Q;
00097   ulong size;           // the Size of the queue in terms of element
00098   ulong avail;          // first Available index to append at (next to tail)
00099   ulong entry;          // the head index or the entry point to the queue.
00100   volatile ulong len;   // actual length
00101   bool inited_queue;
00102 
00103   circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
00104     size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
00105   {
00106     DBUG_ASSERT(size < (ulong) -1);
00107     if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
00108       inited_queue= TRUE;
00109   }
00110   circular_buffer_queue () : inited_queue(FALSE) {}
00111   ~circular_buffer_queue ()
00112   {
00113     if (inited_queue)
00114       delete_dynamic(&Q);
00115   }
00116 
00125   ulong de_queue(uchar *);
00129   ulong de_tail(uchar *val);
00130 
00136   ulong en_queue(void *item);
00140   void* head_queue();
00141   bool   gt(ulong i, ulong k); // comparision of ordering of two entities
00142   /* index is within the valid range */
00143   bool in(ulong k) { return !empty() && 
00144       (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
00145   bool empty() { return entry == size; }
00146   bool full() { return avail == size; }
00147 };
00148 
00149 typedef struct st_slave_job_group
00150 {
00151   char *group_master_log_name;   // (actually redundant)
00152   /*
00153     T-event lop_pos filled by Worker for CheckPoint (CP)
00154   */
00155   my_off_t group_master_log_pos;
00156 
00157   /* 
00158      When relay-log name changes  allocates and fill in a new name of relay-log,
00159      otherwise it fills in NULL.
00160      Coordinator keeps track of each Worker has been notified on the updating
00161      to make sure the routine runs once per change.
00162 
00163      W checks the value at commit and memoriezes a not-NULL.
00164      Freeing unless NULL is left to Coordinator at CP.
00165   */
00166   char     *group_relay_log_name; // The value is last seen relay-log 
00167   my_off_t group_relay_log_pos;  // filled by W
00168   ulong worker_id;
00169   Slave_worker *worker;
00170   ulonglong total_seqno;
00171 
00172   my_off_t master_log_pos;       // B-event log_pos
00173   /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
00174   uint  checkpoint_seqno;
00175   my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
00176   char*    checkpoint_log_name;
00177   my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
00178   char*    checkpoint_relay_log_name;
00179   volatile uchar done;  // Flag raised by W,  read and reset by Coordinator
00180   ulong    shifted;     // shift the last CP bitmap at receiving a new CP
00181   time_t   ts;          // Group's timestampt to update Seconds_behind_master
00182 #ifndef DBUG_OFF
00183   bool     notified;    // to debug group_master_log_name change notification
00184 #endif
00185   /*
00186     Coordinator fills the struct with defaults and options at starting of 
00187     a group distribution.
00188   */
00189   void reset(my_off_t master_pos, ulonglong seqno)
00190   {
00191     master_log_pos= master_pos;
00192     group_master_log_pos= group_relay_log_pos= 0;
00193     group_master_log_name= NULL; // todo: remove
00194     group_relay_log_name= NULL;
00195     worker_id= MTS_WORKER_UNDEF;
00196     total_seqno= seqno;
00197     checkpoint_log_name= NULL;
00198     checkpoint_log_pos= 0;
00199     checkpoint_relay_log_name= NULL;
00200     checkpoint_relay_log_pos= 0;
00201     checkpoint_seqno= (uint) -1;
00202     done= 0;
00203 #ifndef DBUG_OFF
00204     notified= false;
00205 #endif
00206   }
00207 } Slave_job_group;
00208 
00214 class Slave_committed_queue : public circular_buffer_queue
00215 {
00216 public:
00217   
00218   bool inited;
00219 
00220   /* master's Rot-ev exec */
00221   void update_current_binlog(const char *post_rotate);
00222 
00223   /*
00224      The last checkpoint time Low-Water-Mark
00225   */
00226   Slave_job_group lwm;
00227   
00228   /* last time processed indexes for each worker */
00229   DYNAMIC_ARRAY last_done;
00230 
00231   /* the being assigned group index in GAQ */
00232   ulong assigned_group_index;
00233 
00234   Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
00235                          uint inc= 0)
00236     : circular_buffer_queue(el_size, max, inc), inited(FALSE)
00237   {
00238     uint k;
00239     ulonglong l= 0;
00240     
00241     if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
00242       return;
00243     else
00244       inited= TRUE;
00245     my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
00246     for (k= 0; k < n; k++)
00247       insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
00248     lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
00249     lwm.group_relay_log_name[0]= 0;
00250   }
00251 
00252   ~Slave_committed_queue ()
00253   { 
00254     if (inited)
00255     {
00256       delete_dynamic(&last_done);
00257       my_free(lwm.group_relay_log_name);
00258       free_dynamic_items();  // free possibly left allocated strings in GAQ list
00259     }
00260   }
00261 
00262 #ifndef DBUG_OFF
00263   bool count_done(Relay_log_info* rli);
00264 #endif
00265 
00266   /* Checkpoint routine refreshes the queue */
00267   ulong move_queue_head(DYNAMIC_ARRAY *ws);
00268   /* Method is for slave shutdown time cleanup */
00269   void free_dynamic_items();
00270   /* 
00271      returns a pointer to Slave_job_group struct instance as indexed by arg
00272      in the circular buffer dyn-array 
00273   */
00274   Slave_job_group* get_job_group(ulong ind)
00275   {
00276     return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
00277   }
00278 
00283   ulong en_queue(void *item)
00284   {
00285     return assigned_group_index= circular_buffer_queue::en_queue(item);
00286   }
00287 
00288 };
00289 
00290 class Slave_jobs_queue : public circular_buffer_queue
00291 {
00292 public:
00293 
00294   /* 
00295      Coordinator marks with true, Worker signals back at queue back to
00296      available
00297   */
00298   bool overfill;
00299   ulonglong waited_overfill;
00300 };
00301 
00302 class Slave_worker : public Relay_log_info
00303 {
00304 public:
00305   Slave_worker(Relay_log_info *rli
00306 #ifdef HAVE_PSI_INTERFACE
00307                ,PSI_mutex_key *param_key_info_run_lock,
00308                PSI_mutex_key *param_key_info_data_lock,
00309                PSI_mutex_key *param_key_info_sleep_lock,
00310                PSI_mutex_key *param_key_info_data_cond,
00311                PSI_mutex_key *param_key_info_start_cond,
00312                PSI_mutex_key *param_key_info_stop_cond,
00313                PSI_mutex_key *param_key_info_sleep_cond
00314 #endif
00315                , uint param_id
00316               );
00317   virtual ~Slave_worker();
00318 
00319   Slave_jobs_queue jobs;   // assignment queue containing events to execute
00320   mysql_mutex_t jobs_lock; // mutex for the jobs queue
00321   mysql_cond_t  jobs_cond; // condition variable for the jobs queue
00322   Relay_log_info *c_rli;   // pointer to Coordinator's rli
00323   DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
00324   bool curr_group_seen_begin; // is set to TRUE with explicit B-event
00325   ulong id;                 // numberic identifier of the Worker
00326 
00327   /*
00328     Worker runtime statictics
00329   */
00330   // the index in GAQ of the last processed group by this Worker
00331   volatile ulong last_group_done_index;
00332   ulonglong last_groups_assigned_index; // index of previous group assigned to worker
00333   ulong wq_empty_waits;  // how many times got idle
00334   ulong events_done;     // how many events (statements) processed
00335   ulong groups_done;     // how many groups (transactions) processed
00336   volatile int curr_jobs; // number of active  assignments
00337   // number of partitions allocated to the worker at point in time
00338   long usage_partition;
00339   // symmetric to rli->mts_end_group_sets_max_dbs
00340   bool end_group_sets_max_dbs;
00341 
00342   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
00343   volatile bool checkpoint_notified; // Coord sets and resets, W can read
00344   volatile bool master_log_change_notified; // Coord sets and resets, W can read
00345   ulong bitmap_shifted;  // shift the last bitmap at receiving new CP
00346   // WQ current excess above the overrun level
00347   long wq_overrun_cnt;
00348   /*
00349     number of events starting from which Worker queue is regarded as
00350     close to full. The number of the excessive events yields a weight factor
00351     to compute Coordinator's nap.
00352   */
00353   ulong overrun_level;
00354   /*
00355      reverse to overrun: the number of events below which Worker is
00356      considered underruning
00357   */
00358   ulong underrun_level;
00359   /*
00360     Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
00361     When WQ length is dropped below overrun the counter is reset.
00362   */
00363   ulong excess_cnt;
00364   /*
00365     Coordinates of the last CheckPoint (CP) this Worker has
00366     acknowledged; part of is persisent data
00367   */
00368   char checkpoint_relay_log_name[FN_REFLEN];
00369   ulonglong checkpoint_relay_log_pos;
00370   char checkpoint_master_log_name[FN_REFLEN];
00371   ulonglong checkpoint_master_log_pos;
00372   MY_BITMAP group_executed; // bitmap describes groups executed after last CP
00373   MY_BITMAP group_shifted;  // temporary bitmap to compute group_executed
00374   ulong checkpoint_seqno;   // the most significant ON bit in group_executed
00375   enum en_running_state
00376   {
00377     NOT_RUNNING= 0,
00378     RUNNING= 1,
00379     ERROR_LEAVING= 2,         // is set by Worker
00380     STOP= 3,                  // is set by Coordinator upon receiving STOP
00381     STOP_ACCEPTED= 4          // is set by worker upon completing job when STOP SLAVE is issued
00382   };
00383   /*
00384     The running status is guarded by jobs_lock mutex that a writer
00385     Coordinator or Worker itself needs to hold when write a new value.
00386   */
00387   en_running_state volatile running_status;
00388   /*
00389     exit_incremented indicates whether worker has contributed to max updated index.
00390     By default it is set to false. When the worker contibutes for the first time this
00391     variable is set to true.
00392   */
00393   bool exit_incremented;
00394 
00395   int init_worker(Relay_log_info*, ulong);
00396   int rli_init_info(bool);
00397   int flush_info(bool force= FALSE);
00398   static size_t get_number_worker_fields();
00399   void slave_worker_ends_group(Log_event*, int);
00400   const char *get_master_log_name();
00401   ulonglong get_master_log_pos() { return master_log_pos; };
00402   ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
00403   bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
00404   /*
00405     When commit fails clear bitmap for executed worker group. Revert back the
00406     positions to the old positions that existed before commit using the checkpoint.
00407 
00408     @param Slave_job_group a pointer to Slave_job_group struct instance which
00409     holds group master log pos, group relay log pos and checkpoint positions.
00410   */
00411   void rollback_positions(Slave_job_group *ptr_g);
00412   bool reset_recovery_info();
00419   void set_rli_description_event(Format_description_log_event *fdle)
00420   {
00421     DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd));
00422 #ifndef DBUG_OFF
00423     if (fdle)
00424       mysql_mutex_assert_owner(&jobs_lock);
00425 #endif
00426 
00427     if (fdle)
00428       adapt_to_master_version(fdle);
00429     rli_description_event= fdle;
00430   }
00431 
00432   inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
00433   inline void set_gaq_index(ulong val)
00434   { 
00435     if (gaq_index == c_rli->gaq->size)
00436       gaq_index= val;
00437   };
00438 
00439 protected:
00440 
00441   virtual void do_report(loglevel level, int err_code,
00442                          const char *msg, va_list v_args) const;
00443 
00444 private:
00445   ulong gaq_index;          // GAQ index of the current assignment 
00446   ulonglong master_log_pos; // event's cached log_pos for possibile error report
00447   void end_info();
00448   bool read_info(Rpl_info_handler *from);
00449   bool write_info(Rpl_info_handler *to);
00450   Slave_worker& operator=(const Slave_worker& info);
00451   Slave_worker(const Slave_worker& info);
00452 };
00453 
00454 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
00455 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item);
00456 bool set_max_updated_index_on_stop(Slave_worker *worker,
00457                                    Slave_job_item *job_item);
00458 
00459 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
00460 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
00461 #endif // HAVE_REPLICATION
00462 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines